You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 11:49:52 UTC
[23/50] [abbrv] ignite git commit: IGNITE-4102 .NET: Generify
ICacheStore
IGNITE-4102 .NET: Generify ICacheStore
This closes #1670
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eab8334b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eab8334b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eab8334b
Branch: refs/heads/ignite-3477-master
Commit: eab8334bb49ceda249e742246d26f72539f9fa4c
Parents: 1308927
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Mar 27 16:02:47 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Mar 27 16:02:47 2017 +0300
----------------------------------------------------------------------
.../dotnet/PlatformDotNetCacheStore.java | 12 +-
.../Cache/CacheConfigurationTest.cs | 2 +-
.../Cache/Store/CacheStoreAdapterTest.cs | 14 +-
.../Cache/Store/CacheStoreSessionTest.cs | 2 +-
.../Cache/Store/CacheTestParallelLoadStore.cs | 16 +-
.../Cache/Store/CacheTestStore.cs | 13 +-
.../Apache.Ignite.Core.csproj | 4 +-
.../Cache/Configuration/CacheConfiguration.cs | 2 +-
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 12 +-
.../Store/CacheParallelLoadStoreAdapter.cs | 38 ++-
.../Cache/Store/CacheStoreAdapter.cs | 30 +-
.../Cache/Store/ICacheStore.cs | 39 ++-
.../Cache/Store/ICacheStoreSession.cs | 2 +-
.../Datastream/IDataStreamer.cs | 2 +-
.../Impl/Cache/Store/CacheStore.cs | 233 ++++-----------
.../Impl/Cache/Store/CacheStoreInternal.cs | 285 +++++++++++++++++++
.../Impl/Cache/Store/ICacheStoreInternal.cs | 43 +++
.../Datagrid/StoreExample.cs | 1 +
.../Datagrid/EmployeeStore.cs | 27 +-
19 files changed, 497 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index c2f6001..dd61a54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -201,7 +201,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
writer.writeByte(OP_LOAD_ALL);
writer.writeLong(session());
writer.writeString(ses.cacheName());
- writer.writeCollection(keys0);
+
+ writer.writeInt(keys0.size());
+
+ for (Object o : keys0)
+ writer.writeObject(o);
}
}, new IgniteInClosureX<BinaryRawReaderEx>() {
@Override public void applyx(BinaryRawReaderEx reader) {
@@ -311,7 +315,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
writer.writeByte(OP_RMV_ALL);
writer.writeLong(session());
writer.writeString(ses.cacheName());
- writer.writeCollection(keys);
+
+ writer.writeInt(keys.size());
+
+ for (Object o : keys)
+ writer.writeObject(o);
}
}, null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
index 02c0fc3..7a30780 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
@@ -703,7 +703,7 @@ namespace Apache.Ignite.Core.Tests.Cache
/// <summary>
/// Test store.
/// </summary>
- private class CacheStoreTest : CacheStoreAdapter
+ private class CacheStoreTest : CacheStoreAdapter<object, object>
{
/** <inheritdoc /> */
public override object Load(object key)
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs
index 6690584..02da750 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs
@@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using NUnit.Framework;
/// <summary>
- /// Tests for <see cref="CacheStoreAdapter"/>.
+ /// Tests for <see cref="CacheStoreAdapter{K, V}"/>.
/// </summary>
public class CacheStoreAdapterTest
{
@@ -62,26 +62,26 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/// <summary>
/// Test store.
/// </summary>
- private class Store : CacheStoreAdapter
+ private class Store : CacheStoreAdapter<int, string>
{
/** */
- public readonly Dictionary<object, object> Map = new Dictionary<object, object>();
+ public readonly Dictionary<int, string> Map = new Dictionary<int, string>();
/** <inheritdoc /> */
- public override object Load(object key)
+ public override string Load(int key)
{
- object res;
+ string res;
return Map.TryGetValue(key, out res) ? res : null;
}
/** <inheritdoc /> */
- public override void Write(object key, object val)
+ public override void Write(int key, string val)
{
Map[key] = val;
}
/** <inheritdoc /> */
- public override void Delete(object key)
+ public override void Delete(int key)
{
Map.Remove(key);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
index d01726a..6f9d791 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/// Test store implementation.
/// </summary>
// ReSharper disable once UnusedMember.Global
- public class Store : CacheStoreAdapter
+ public class Store : CacheStoreAdapter<object, object>
{
/** Store session. */
[StoreSessionResource]
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
index 81b4697..4786032 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
@@ -17,8 +17,6 @@
namespace Apache.Ignite.Core.Tests.Cache.Store
{
- using System;
- using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@@ -28,7 +26,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/// <summary>
/// Test cache store with parallel load.
/// </summary>
- public class CacheTestParallelLoadStore : CacheParallelLoadStoreAdapter
+ public class CacheTestParallelLoadStore :
+ CacheParallelLoadStoreAdapter<object, object, CacheTestParallelLoadStore.Record>
{
/** Length of input data sequence */
public const int InputDataLength = 10000;
@@ -61,23 +60,21 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
}
/** <inheritdoc /> */
- protected override IEnumerable GetInputData()
+ protected override IEnumerable<Record> GetInputData()
{
return Enumerable.Range(0, InputDataLength).Select(x => new Record {Id = x, Name = "Test Record " + x});
}
/** <inheritdoc /> */
- protected override KeyValuePair<object, object>? Parse(object inputRecord, params object[] args)
+ protected override KeyValuePair<object, object>? Parse(Record inputRecord, params object[] args)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
ThreadIds.GetOrAdd(threadId, threadId);
var minId = (int)args[0];
- var rec = (Record)inputRecord;
-
- return rec.Id >= minId
- ? new KeyValuePair<object, object>(rec.Id, rec)
+ return inputRecord.Id >= minId
+ ? new KeyValuePair<object, object>(inputRecord.Id, inputRecord)
: (KeyValuePair<object, object>?) null;
}
@@ -94,6 +91,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/// <summary>
/// Gets or sets the name.
/// </summary>
+ // ReSharper disable once UnusedAutoPropertyAccessor.Global
public string Name { get; set; }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
index f80f5ce..36b190f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using System;
using System.Collections;
using System.Collections.Concurrent;
+ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
@@ -29,7 +30,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using Apache.Ignite.Core.Resource;
[SuppressMessage("ReSharper", "FieldCanBeMadeReadOnly.Local")]
- public class CacheTestStore : ICacheStore
+ public class CacheTestStore : ICacheStore<object, object>
{
public static readonly IDictionary Map = new ConcurrentDictionary<object, object>();
@@ -115,13 +116,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
return Map[key];
}
- public IDictionary LoadAll(ICollection keys)
+ public IEnumerable<KeyValuePair<object, object>> LoadAll(IEnumerable<object> keys)
{
ThrowIfNeeded();
Debug.Assert(_grid != null);
- return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key);
+ return keys.ToDictionary(key => key, key =>(object)( "val_" + key));
}
public void Write(object key, object val)
@@ -133,13 +134,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
Map[key] = val;
}
- public void WriteAll(IDictionary map)
+ public void WriteAll(IEnumerable<KeyValuePair<object, object>> map)
{
ThrowIfNeeded();
Debug.Assert(_grid != null);
- foreach (DictionaryEntry e in map)
+ foreach (var e in map)
Map[e.Key] = e.Value;
}
@@ -152,7 +153,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
Map.Remove(key);
}
- public void DeleteAll(ICollection keys)
+ public void DeleteAll(IEnumerable<object> keys)
{
ThrowIfNeeded();
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 58002db..eab0bb5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -191,6 +191,8 @@
<Compile Include="Impl\Binary\SerializableSerializer.cs" />
<Compile Include="Impl\Binary\BinaryWriterExtensions.cs" />
<Compile Include="Impl\Cache\Affinity\AffinityFunctionBase.cs" />
+ <Compile Include="Impl\Cache\Store\CacheStore.cs" />
+ <Compile Include="Impl\Cache\Store\ICacheStoreInternal.cs" />
<Compile Include="Impl\Transactions\CacheTransactionManager.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" />
@@ -299,7 +301,7 @@
<Compile Include="Impl\Cache\Query\Continuous\ContinuousQueryUtils.cs" />
<Compile Include="Impl\Cache\Query\FieldsQueryCursor.cs" />
<Compile Include="Impl\Cache\Query\QueryCursor.cs" />
- <Compile Include="Impl\Cache\Store\CacheStore.cs" />
+ <Compile Include="Impl\Cache\Store\CacheStoreInternal.cs" />
<Compile Include="Impl\Cache\Store\CacheStoreSession.cs" />
<Compile Include="Impl\Cache\Store\CacheStoreSessionProxy.cs" />
<Compile Include="Impl\Cluster\ClusterGroupImpl.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
index ebf412d..29d2ee3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
@@ -602,7 +602,7 @@ namespace Apache.Ignite.Core.Cache.Configuration
/// <summary>
/// Maximum batch size for write-behind cache store operations.
/// Store operations (get or remove) are combined in a batch of this size to be passed to
- /// <see cref="ICacheStore.WriteAll"/> or <see cref="ICacheStore.DeleteAll"/> methods.
+ /// <see cref="ICacheStore{K, V}.WriteAll"/> or <see cref="ICacheStore{K, V}.DeleteAll"/> methods.
/// </summary>
[DefaultValue(DefaultWriteBehindBatchSize)]
public int WriteBehindBatchSize { get; set; }
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index 50938e1..77e47c7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -118,7 +118,7 @@ namespace Apache.Ignite.Core.Cache
/// Optional predicate. If provided, will be used to filter values to be put into cache.
/// </param>
/// <param name="args">
- /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />.
+ /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />.
/// </param>
void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args);
@@ -129,12 +129,12 @@ namespace Apache.Ignite.Core.Cache
/// Optional predicate. If provided, will be used to filter values to be put into cache.
/// </param>
/// <param name="args">
- /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />.
+ /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />.
/// </param>
Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args);
/// <summary>
- /// Delegates to <see cref="ICacheStore.LoadCache" /> method to load state
+ /// Delegates to <see cref="ICacheStore{K, V}.LoadCache" /> method to load state
/// from the underlying persistent storage. The loaded values will then be given
/// to the optionally passed in predicate, and, if the predicate returns true,
/// will be stored in cache. If predicate is null, then all loaded values will be stored in cache.
@@ -143,12 +143,12 @@ namespace Apache.Ignite.Core.Cache
/// Optional predicate. If provided, will be used to filter values to be put into cache.
/// </param>
/// <param name="args">
- /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />.
+ /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />.
/// </param>
void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args);
/// <summary>
- /// Delegates to <see cref="ICacheStore.LoadCache" /> method to load state
+ /// Delegates to <see cref="ICacheStore{K, V}.LoadCache" /> method to load state
/// from the underlying persistent storage. The loaded values will then be given
/// to the optionally passed in predicate, and, if the predicate returns true,
/// will be stored in cache. If predicate is null, then all loaded values will be stored in cache.
@@ -157,7 +157,7 @@ namespace Apache.Ignite.Core.Cache
/// Optional predicate. If provided, will be used to filter values to be put into cache.
/// </param>
/// <param name="args">
- /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />.
+ /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />.
/// </param>
Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args);
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs
index c506838..467b246 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs
@@ -18,10 +18,8 @@
namespace Apache.Ignite.Core.Cache.Store
{
using System;
- using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
- using System.Linq;
using System.Threading.Tasks;
/// <summary>
@@ -32,19 +30,17 @@ namespace Apache.Ignite.Core.Cache.Store
/// GetInputData().GetEnumerator() result will be disposed if it implements IDisposable.
/// Any additional post-LoadCache steps can be performed by overriding LoadCache method.
/// </remarks>
- public abstract class CacheParallelLoadStoreAdapter : ICacheStore
+ /// <typeparam name="TK">Key type.</typeparam>
+ /// <typeparam name="TV">Value type.</typeparam>
+ /// <typeparam name="TData">Custom data entry type.</typeparam>
+ public abstract class CacheParallelLoadStoreAdapter<TK, TV, TData> : ICacheStore<TK, TV>
{
/// <summary>
- /// Default number of working threads (equal to the number of available processors).
- /// </summary>
- public static readonly int DefaultThreadsCount = Environment.ProcessorCount;
-
- /// <summary>
/// Constructor.
/// </summary>
protected CacheParallelLoadStoreAdapter()
{
- MaxDegreeOfParallelism = DefaultThreadsCount;
+ MaxDegreeOfParallelism = Environment.ProcessorCount;
}
/// <summary>
@@ -62,7 +58,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <param name="act">Action for loaded values.</param>
/// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache" /> method.</param>
/// <exception cref="CacheStoreException" />
- public virtual void LoadCache(Action<object, object> act, params object[] args)
+ public virtual void LoadCache(Action<TK, TV> act, params object[] args)
{
if (MaxDegreeOfParallelism == 0 || MaxDegreeOfParallelism < -1)
throw new ArgumentOutOfRangeException("MaxDegreeOfParallelism must be either positive or -1: " +
@@ -70,7 +66,7 @@ namespace Apache.Ignite.Core.Cache.Store
var options = new ParallelOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism};
- Parallel.ForEach(GetInputData().OfType<object>(), options, item =>
+ Parallel.ForEach(GetInputData(), options, item =>
{
var cacheEntry = Parse(item, args);
@@ -83,19 +79,19 @@ namespace Apache.Ignite.Core.Cache.Store
/// Gets the input data sequence to be used in LoadCache.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")]
- protected abstract IEnumerable GetInputData();
+ protected abstract IEnumerable<TData> GetInputData();
/// <summary>
/// This method should transform raw data records from GetInputData
/// into valid key-value pairs to be stored into cache.
/// </summary>
- protected abstract KeyValuePair<object, object>? Parse(object inputRecord, params object[] args);
+ protected abstract KeyValuePair<TK, TV>? Parse(TData inputRecord, params object[] args);
/// <summary>
/// Gets or sets the maximum degree of parallelism to use in LoadCache.
/// Must be either positive or -1 for unlimited amount of threads.
/// <para />
- /// Defaults to <see cref="DefaultThreadsCount"/>.
+ /// Defaults to <see cref="Environment.ProcessorCount"/>.
/// </summary>
public int MaxDegreeOfParallelism { get; set; }
@@ -111,9 +107,9 @@ namespace Apache.Ignite.Core.Cache.Store
/// or <c>null</c> if the object can't be loaded
/// </returns>
[ExcludeFromCodeCoverage]
- public virtual object Load(object key)
+ public virtual TV Load(TK key)
{
- return null;
+ return default(TV);
}
/// <summary>
@@ -126,7 +122,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// A map of key, values to be stored in the cache.
/// </returns>
[ExcludeFromCodeCoverage]
- public virtual IDictionary LoadAll(ICollection keys)
+ public virtual IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys)
{
return null;
}
@@ -139,7 +135,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <param name="key">Key to write.</param>
/// <param name="val">Value to write.</param>
[ExcludeFromCodeCoverage]
- public virtual void Write(object key, object val)
+ public virtual void Write(TK key, TV val)
{
// No-op.
}
@@ -158,7 +154,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// to write for write-through. Upon return the collection must only contain entries
/// that were not successfully written. (see partial success above).</param>
[ExcludeFromCodeCoverage]
- public virtual void WriteAll(IDictionary entries)
+ public virtual void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries)
{
// No-op.
}
@@ -172,7 +168,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// </summary>
/// <param name="key">The key that is used for the delete operation.</param>
[ExcludeFromCodeCoverage]
- public virtual void Delete(object key)
+ public virtual void Delete(TK key)
{
// No-op.
}
@@ -195,7 +191,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// it contains the keys to delete for write-through. Upon return the collection must only contain
/// the keys that were not successfully deleted.</param>
[ExcludeFromCodeCoverage]
- public virtual void DeleteAll(ICollection keys)
+ public virtual void DeleteAll(IEnumerable<TK> keys)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs
index a38678d..769c4c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs
@@ -18,7 +18,7 @@
namespace Apache.Ignite.Core.Cache.Store
{
using System;
- using System.Collections;
+ using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
@@ -34,7 +34,9 @@ namespace Apache.Ignite.Core.Cache.Store
/// Note that <c>LoadCache</c> method has empty implementation because it is
/// essentially up to the user to invoke it with specific arguments.
/// </summary>
- public abstract class CacheStoreAdapter : ICacheStore
+ /// <typeparam name="TK">Key type.</typeparam>
+ /// <typeparam name="TV">Value type.</typeparam>
+ public abstract class CacheStoreAdapter<TK, TV> : ICacheStore<TK, TV>
{
/// <summary>
/// Loads all values from underlying persistent storage. Note that keys are
@@ -50,11 +52,11 @@ namespace Apache.Ignite.Core.Cache.Store
/// </summary>
/// <param name="act">Action for loaded values.</param>
/// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache" /> method.</param>
- public virtual void LoadCache(Action<object, object> act, params object[] args)
+ public virtual void LoadCache(Action<TK, TV> act, params object[] args)
{
// No-op.
}
-
+
/// <summary>
/// Loads multiple objects. Application developers should implement this method to customize
/// the loading of cache entries. This method is called when the requested object is not in the cache.
@@ -64,19 +66,19 @@ namespace Apache.Ignite.Core.Cache.Store
/// <returns>
/// A map of key, values to be stored in the cache.
/// </returns>
- public virtual IDictionary LoadAll(ICollection keys)
+ public virtual IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys)
{
- return keys.OfType<object>().ToDictionary(key => key, Load);
+ return keys.ToDictionary(key => key, Load);
}
-
+
/// <summary>
/// Writes all.
/// </summary>
/// <param name="entries">The map.</param>
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public virtual void WriteAll(IDictionary entries)
+ public virtual void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries)
{
- foreach (DictionaryEntry entry in entries)
+ foreach (var entry in entries)
Write(entry.Key, entry.Value);
}
@@ -98,9 +100,9 @@ namespace Apache.Ignite.Core.Cache.Store
/// it contains the keys to delete for write-through. Upon return the collection must only contain
/// the keys that were not successfully deleted.</param>
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public virtual void DeleteAll(ICollection keys)
+ public virtual void DeleteAll(IEnumerable<TK> keys)
{
- foreach (object key in keys)
+ foreach (var key in keys)
Delete(key);
}
@@ -125,7 +127,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// The value for the entry that is to be stored in the cache
/// or <c>null</c> if the object can't be loaded
/// </returns>
- public abstract object Load(object key);
+ public abstract TV Load(TK key);
/// <summary>
/// Write the specified value under the specified key to the external resource.
@@ -134,7 +136,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// </summary>
/// <param name="key">Key to write.</param>
/// <param name="val">Value to write.</param>
- public abstract void Write(object key, object val);
+ public abstract void Write(TK key, TV val);
/// <summary>
/// Delete the cache entry from the external resource.
@@ -144,6 +146,6 @@ namespace Apache.Ignite.Core.Cache.Store
/// This method is invoked even if no mapping for the key exists.
/// </summary>
/// <param name="key">The key that is used for the delete operation.</param>
- public abstract void Delete(object key);
+ public abstract void Delete(TK key);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs
index d6e4f80..044784a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs
@@ -18,12 +18,19 @@
namespace Apache.Ignite.Core.Cache.Store
{
using System;
- using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Transactions;
/// <summary>
/// API for cache persistent storage for read-through and write-through behavior.
- ///
+ /// <para />
+ /// Generic argument types depend on <see cref="CacheConfiguration.KeepBinaryInStore"/> property.
+ /// When <c>true</c> (default), cache store operates on <see cref="IBinaryObject"/> instances.
+ /// Otherwise, generic arguments should be the same as in corresponding <see cref="ICache{TK, TV}"/>.
+ /// <para />
/// Persistent store is configured in Ignite's Spring XML configuration file via
/// <c>CacheConfiguration.setStore()</c> property. If you have an implementation
/// of cache store in .NET, you should use special Java wrapper which accepts assembly name and
@@ -75,7 +82,9 @@ namespace Apache.Ignite.Core.Cache.Store
/// </code>
/// </example>
/// </summary>
- public interface ICacheStore
+ /// <typeparam name="TK">Key type.</typeparam>
+ /// <typeparam name="TV">Value type.</typeparam>
+ public interface ICacheStore<TK, TV> : ICacheStore
{
/// <summary>
/// Loads all values from underlying persistent storage. Note that keys are
@@ -92,7 +101,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <param name="act">Action for loaded values.</param>
/// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache"/> method.</param>
/// <exception cref="CacheStoreException" />
- void LoadCache(Action<object, object> act, params object[] args);
+ void LoadCache(Action<TK, TV> act, params object[] args);
/// <summary>
/// Loads an object. Application developers should implement this method to customize the loading
@@ -104,7 +113,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <returns>The value for the entry that is to be stored in the cache
/// or <c>null</c> if the object can't be loaded</returns>
/// <exception cref="CacheStoreException" />
- object Load(object key);
+ TV Load(TK key);
/// <summary>
/// Loads multiple objects. Application developers should implement this method to customize
@@ -114,7 +123,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <param name="keys">Keys identifying the values to be loaded.</param>
/// <returns>A map of key, values to be stored in the cache.</returns>
/// <exception cref="CacheStoreException" />
- IDictionary LoadAll(ICollection keys);
+ IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys);
/// <summary>
/// Write the specified value under the specified key to the external resource.
@@ -124,7 +133,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// <param name="key">Key to write.</param>
/// <param name="val">Value to write.</param>
/// <exception cref="CacheStoreException" />
- void Write(object key, object val);
+ void Write(TK key, TV val);
/// <summary>
/// Write the specified entries to the external resource.
@@ -140,7 +149,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// to write for write-through. Upon return the collection must only contain entries
/// that were not successfully written. (see partial success above).</param>
/// <exception cref="CacheStoreException" />
- void WriteAll(IDictionary entries);
+ void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries);
/// <summary>
/// Delete the cache entry from the external resource.
@@ -151,7 +160,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// </summary>
/// <param name="key">The key that is used for the delete operation.</param>
/// <exception cref="CacheStoreException" />
- void Delete(object key);
+ void Delete(TK key);
/// <summary>
/// Remove data and keys from the external resource for the given collection of keys, if present.
@@ -171,7 +180,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// it contains the keys to delete for write-through. Upon return the collection must only contain
/// the keys that were not successfully deleted.</param>
/// <exception cref="CacheStoreException" />
- void DeleteAll(ICollection keys);
+ void DeleteAll(IEnumerable<TK> keys);
/// <summary>
/// Tells store to commit or rollback a transaction depending on the value of the
@@ -181,4 +190,14 @@ namespace Apache.Ignite.Core.Cache.Store
/// <exception cref="CacheStoreException" />
void SessionEnd(bool commit);
}
+
+ /// <summary>
+ /// Non-generic base type for <see cref="ICacheStore{TK,TV}"/>, used only for configuration property.
+ /// Users should implement generic <see cref="ICacheStore{TK,TV}"/>.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")]
+ public interface ICacheStore
+ {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs
index e20a660..bd9ccdf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs
@@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Cache.Store
/// Session for the cache store operations. The main purpose of cache store session
/// is to hold context between multiple store invocations whenever in transaction. For example,
/// you can save current database connection in the session <see cref="Properties"/> map. You can then
- /// commit this connection in the <see cref="ICacheStore.SessionEnd(bool)"/> method.
+ /// commit this connection in the <see cref="ICacheStore{K,V}.SessionEnd(bool)"/> method.
/// </summary>
public interface ICacheStoreSession
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 64c0f9e..d18040f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -34,7 +34,7 @@ namespace Apache.Ignite.Core.Datastream
/// <para />
/// Also note that <c>IDataStreamer</c> is not the only way to load data into cache.
/// Alternatively you can use
- /// <see cref="ICacheStore.LoadCache(Action{object, object}, object[])"/>
+ /// <see cref="ICacheStore{K, V}.LoadCache(Action{K, V}, object[])"/>
/// method to load data from underlying data store. You can also use standard cache
/// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform
/// as well as this class for loading data. And finally, data can be loaded from underlying
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
index befe72b..f728e2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
@@ -17,74 +17,37 @@
namespace Apache.Ignite.Core.Impl.Cache.Store
{
- using System.Collections;
+ using System;
using System.Diagnostics;
- using System.IO;
- using Apache.Ignite.Core.Binary;
+ using System.Globalization;
+ using System.Linq;
using Apache.Ignite.Core.Cache.Store;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Handle;
- using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Impl.Memory;
/// <summary>
- /// Interop cache store.
+ /// Interop cache store, delegates to generic <see cref="CacheStoreInternal{TK,TV}"/> wrapper.
/// </summary>
internal class CacheStore
{
- /** */
- private const byte OpLoadCache = 0;
-
- /** */
- private const byte OpLoad = 1;
-
- /** */
- private const byte OpLoadAll = 2;
-
- /** */
- private const byte OpPut = 3;
-
- /** */
- private const byte OpPutAll = 4;
-
- /** */
- private const byte OpRmv = 5;
-
- /** */
- private const byte OpRmvAll = 6;
-
- /** */
- private const byte OpSesEnd = 7;
-
- /** */
- private readonly bool _convertBinary;
-
/** Store. */
- private readonly ICacheStore _store;
-
- /** Session. */
- private readonly CacheStoreSessionProxy _sesProxy;
+ private readonly ICacheStoreInternal _store;
/** */
private readonly long _handle;
-
+
/// <summary>
/// Initializes a new instance of the <see cref="CacheStore" /> class.
/// </summary>
/// <param name="store">Store.</param>
- /// <param name="convertBinary">Whether to convert binary objects.</param>
/// <param name="registry">The handle registry.</param>
- private CacheStore(ICacheStore store, bool convertBinary, HandleRegistry registry)
+ private CacheStore(ICacheStoreInternal store, HandleRegistry registry)
{
Debug.Assert(store != null);
_store = store;
- _convertBinary = convertBinary;
-
- _sesProxy = new CacheStoreSessionProxy();
-
- ResourceProcessor.InjectStoreSession(store, _sesProxy);
_handle = registry.AllocateCritical(this);
}
@@ -97,7 +60,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
/// <returns>
/// Interop cache store.
/// </returns>
- internal static CacheStore CreateInstance(long memPtr, HandleRegistry registry)
+ public static CacheStore CreateInstance(long memPtr, HandleRegistry registry)
{
using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
@@ -109,7 +72,14 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
ICacheStore store;
if (factory != null)
+ {
store = factory.CreateInstance();
+
+ if (store == null)
+ {
+ throw new IgniteException("Cache store factory should not return null: " + factory.GetType());
+ }
+ }
else
{
var className = reader.ReadString();
@@ -118,8 +88,13 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
store = IgniteUtils.CreateInstance<ICacheStore>(className, propertyMap);
}
+ var iface = GetCacheStoreInterface(store);
- return new CacheStore(store, convertBinary, registry);
+ var storeType = typeof(CacheStoreInternal<,>).MakeGenericType(iface.GetGenericArguments());
+
+ var storeInt = (ICacheStoreInternal)Activator.CreateInstance(storeType, store, convertBinary);
+
+ return new CacheStore(storeInt, registry);
}
}
@@ -137,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
/// <param name="grid">Grid.</param>
public void Init(Ignite grid)
{
- ResourceProcessor.Inject(_store, grid);
+ _store.Init(grid);
}
/// <summary>
@@ -147,148 +122,36 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
/// <param name="grid">Grid.</param>
/// <returns>Invocation result.</returns>
/// <exception cref="IgniteException">Invalid operation type: + opType</exception>
- public int Invoke(IBinaryStream stream, Ignite grid)
+ public long Invoke(PlatformMemoryStream stream, Ignite grid)
{
- IBinaryReader reader = grid.Marshaller.StartUnmarshal(stream,
- _convertBinary ? BinaryMode.Deserialize : BinaryMode.ForceBinary);
-
- IBinaryRawReader rawReader = reader.GetRawReader();
-
- int opType = rawReader.ReadByte();
-
- // Setup cache session for this invocation.
- long sesId = rawReader.ReadLong();
-
- CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true);
-
- ses.CacheName = rawReader.ReadString();
-
- _sesProxy.SetSession(ses);
+ return _store.Invoke(stream, grid);
+ }
+
+ /// <summary>
+ /// Gets the generic <see cref="ICacheStore{TK,TV}"/> interface type.
+ /// </summary>
+ private static Type GetCacheStoreInterface(ICacheStore store)
+ {
+ var ifaces = store.GetType().GetInterfaces()
+ .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICacheStore<,>))
+ .ToArray();
- try
+ if (ifaces.Length == 0)
{
- // Perform operation.
- switch (opType)
- {
- case OpLoadCache:
- {
- var args = rawReader.ReadArray<object>();
-
- stream.Seek(0, SeekOrigin.Begin);
-
- int cnt = 0;
- stream.WriteInt(cnt); // Reserve space for count.
-
- var writer = grid.Marshaller.StartMarshal(stream);
-
- _store.LoadCache((k, v) =>
- {
- lock (writer) // User-defined store can be multithreaded.
- {
- writer.WithDetach(w =>
- {
- w.WriteObject(k);
- w.WriteObject(v);
- });
-
- cnt++;
- }
- }, args);
-
- stream.WriteInt(0, cnt);
-
- grid.Marshaller.FinishMarshal(writer);
-
- break;
- }
-
- case OpLoad:
- {
- var val = _store.Load(rawReader.ReadObject<object>());
-
- stream.Seek(0, SeekOrigin.Begin);
-
- var writer = grid.Marshaller.StartMarshal(stream);
-
- writer.WriteObject(val);
-
- grid.Marshaller.FinishMarshal(writer);
-
- break;
- }
-
- case OpLoadAll:
- {
- var keys = rawReader.ReadCollection();
-
- var result = _store.LoadAll(keys);
-
- stream.Seek(0, SeekOrigin.Begin);
-
- stream.WriteInt(result.Count);
-
- var writer = grid.Marshaller.StartMarshal(stream);
-
- foreach (DictionaryEntry entry in result)
- {
- var entry0 = entry; // Copy modified closure.
-
- writer.WithDetach(w =>
- {
- w.WriteObject(entry0.Key);
- w.WriteObject(entry0.Value);
- });
- }
-
- grid.Marshaller.FinishMarshal(writer);
-
- break;
- }
-
- case OpPut:
- _store.Write(rawReader.ReadObject<object>(), rawReader.ReadObject<object>());
-
- break;
-
- case OpPutAll:
- var size = rawReader.ReadInt();
-
- var dict = new Hashtable(size);
-
- for (int i = 0; i < size; i++)
- dict[rawReader.ReadObject<object>()] = rawReader.ReadObject<object>();
-
- _store.WriteAll(dict);
-
- break;
-
- case OpRmv:
- _store.Delete(rawReader.ReadObject<object>());
-
- break;
-
- case OpRmvAll:
- _store.DeleteAll(rawReader.ReadCollection());
-
- break;
-
- case OpSesEnd:
- grid.HandleRegistry.Release(sesId);
-
- _store.SessionEnd(rawReader.ReadBoolean());
-
- break;
-
- default:
- throw new IgniteException("Invalid operation type: " + opType);
- }
-
- return 0;
+ throw new IgniteException(string.Format(
+ CultureInfo.InvariantCulture, "Cache store should implement generic {0} interface: {1}",
+ typeof(ICacheStore<,>), store.GetType()));
}
- finally
+
+ if (ifaces.Length > 1)
{
- _sesProxy.ClearSession();
+ throw new IgniteException(string.Format(
+ CultureInfo.InvariantCulture, "Cache store should not implement generic {0} " +
+ "interface more than once: {1}",
+ typeof(ICacheStore<,>), store.GetType()));
}
+
+ return ifaces[0];
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
new file mode 100644
index 0000000..f147579
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Store
+{
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+
+ /// <summary>
+ /// Generic cache store wrapper.
+ /// </summary>
+ internal class CacheStoreInternal<TK, TV> : ICacheStoreInternal
+ {
+ /** */
+ private const byte OpLoadCache = 0;
+
+ /** */
+ private const byte OpLoad = 1;
+
+ /** */
+ private const byte OpLoadAll = 2;
+
+ /** */
+ private const byte OpPut = 3;
+
+ /** */
+ private const byte OpPutAll = 4;
+
+ /** */
+ private const byte OpRmv = 5;
+
+ /** */
+ private const byte OpRmvAll = 6;
+
+ /** */
+ private const byte OpSesEnd = 7;
+
+ /** */
+ private readonly bool _convertBinary;
+
+ /** User store. */
+ private readonly ICacheStore<TK, TV> _store;
+
+ /** Session. */
+ private readonly CacheStoreSessionProxy _sesProxy;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheStoreInternal{TK,TV}"/> class.
+ /// </summary>
+ public CacheStoreInternal(ICacheStore<TK, TV> store, bool convertBinary)
+ {
+ Debug.Assert(store != null);
+
+ _store = store;
+
+ _convertBinary = convertBinary;
+
+ _sesProxy = new CacheStoreSessionProxy();
+
+ ResourceProcessor.InjectStoreSession(store, _sesProxy);
+ }
+
+ /// <summary>
+ /// Initializes this instance with a grid.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ public void Init(Ignite grid)
+ {
+ ResourceProcessor.Inject(_store, grid);
+ }
+
+ /// <summary>
+ /// Invokes a store operation.
+ /// </summary>
+ /// <param name="stream">Input stream.</param>
+ /// <param name="grid">Grid.</param>
+ /// <returns>Invocation result.</returns>
+ /// <exception cref="IgniteException">Invalid operation type: + opType</exception>
+ public int Invoke(IBinaryStream stream, Ignite grid)
+ {
+ IBinaryReader reader = grid.Marshaller.StartUnmarshal(stream,
+ _convertBinary ? BinaryMode.Deserialize : BinaryMode.ForceBinary);
+
+ IBinaryRawReader rawReader = reader.GetRawReader();
+
+ int opType = rawReader.ReadByte();
+
+ // Setup cache session for this invocation.
+ long sesId = rawReader.ReadLong();
+
+ CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true);
+
+ ses.CacheName = rawReader.ReadString();
+
+ _sesProxy.SetSession(ses);
+
+ try
+ {
+ // Perform operation.
+ switch (opType)
+ {
+ case OpLoadCache:
+ {
+ var args = rawReader.ReadArray<object>();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ int cnt = 0;
+ stream.WriteInt(cnt); // Reserve space for count.
+
+ var writer = grid.Marshaller.StartMarshal(stream);
+
+ _store.LoadCache((k, v) =>
+ {
+ lock (writer) // User-defined store can be multithreaded.
+ {
+ writer.WithDetach(w =>
+ {
+ w.WriteObject(k);
+ w.WriteObject(v);
+ });
+
+ cnt++;
+ }
+ }, args);
+
+ stream.WriteInt(0, cnt);
+
+ grid.Marshaller.FinishMarshal(writer);
+
+ break;
+ }
+
+ case OpLoad:
+ {
+ var val = _store.Load(rawReader.ReadObject<TK>());
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ var writer = grid.Marshaller.StartMarshal(stream);
+
+ writer.WriteObject(val);
+
+ grid.Marshaller.FinishMarshal(writer);
+
+ break;
+ }
+
+ case OpLoadAll:
+ {
+ // We can't do both read and write lazily because stream is reused.
+ // Read keys non-lazily, write result lazily.
+ var keys = ReadAllKeys(rawReader);
+
+ var result = _store.LoadAll(keys);
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ int cnt = 0;
+ stream.WriteInt(cnt); // Reserve space for count.
+
+ var writer = grid.Marshaller.StartMarshal(stream);
+
+ foreach (var entry in result)
+ {
+ var entry0 = entry; // Copy modified closure.
+
+ writer.WithDetach(w =>
+ {
+ w.WriteObject(entry0.Key);
+ w.WriteObject(entry0.Value);
+ });
+
+ cnt++;
+ }
+
+ stream.WriteInt(0, cnt);
+
+ grid.Marshaller.FinishMarshal(writer);
+
+ break;
+ }
+
+ case OpPut:
+ _store.Write(rawReader.ReadObject<TK>(), rawReader.ReadObject<TV>());
+
+ break;
+
+ case OpPutAll:
+ _store.WriteAll(ReadPairs(rawReader));
+
+ break;
+
+ case OpRmv:
+ _store.Delete(rawReader.ReadObject<TK>());
+
+ break;
+
+ case OpRmvAll:
+ _store.DeleteAll(ReadKeys(rawReader));
+
+ break;
+
+ case OpSesEnd:
+ grid.HandleRegistry.Release(sesId);
+
+ _store.SessionEnd(rawReader.ReadBoolean());
+
+ break;
+
+ default:
+ throw new IgniteException("Invalid operation type: " + opType);
+ }
+
+ return 0;
+ }
+ finally
+ {
+ _sesProxy.ClearSession();
+ }
+ }
+
+ /// <summary>
+ /// Reads key-value pairs.
+ /// </summary>
+ private static IEnumerable<KeyValuePair<TK, TV>> ReadPairs(IBinaryRawReader rawReader)
+ {
+ var size = rawReader.ReadInt();
+
+ for (var i = 0; i < size; i++)
+ {
+ yield return new KeyValuePair<TK, TV>(rawReader.ReadObject<TK>(), rawReader.ReadObject<TV>());
+ }
+ }
+
+ /// <summary>
+ /// Reads the keys.
+ /// </summary>
+ private static IEnumerable<TK> ReadKeys(IBinaryRawReader reader)
+ {
+ var cnt = reader.ReadInt();
+
+ for (var i = 0; i < cnt; i++)
+ {
+ yield return reader.ReadObject<TK>();
+ }
+ }
+ /// <summary>
+ /// Reads the keys.
+ /// </summary>
+ private static ICollection<TK> ReadAllKeys(IBinaryRawReader reader)
+ {
+ var cnt = reader.ReadInt();
+ var res = new List<TK>(cnt);
+
+ for (var i = 0; i < cnt; i++)
+ {
+ res.Add(reader.ReadObject<TK>());
+ }
+
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs
new file mode 100644
index 0000000..7ec44d9
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Store
+{
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+
+ /// <summary>
+ /// Provides a non-generic way to work with <see cref="CacheStoreInternal{TK, TV}"/>.
+ /// </summary>
+ internal interface ICacheStoreInternal
+ {
+ /// <summary>
+ /// Invokes a store operation.
+ /// </summary>
+ /// <param name="stream">Input stream.</param>
+ /// <param name="grid">Grid.</param>
+ /// <returns>Invocation result.</returns>
+ /// <exception cref="IgniteException">Invalid operation type: + opType</exception>
+ int Invoke(IBinaryStream stream, Ignite grid);
+
+ /// <summary>
+ /// Initializes this instance with a grid.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ void Init(Ignite grid);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs
index 62da647..6915d79 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs
@@ -59,6 +59,7 @@ namespace Apache.Ignite.Examples.Datagrid
Name = CacheName,
ReadThrough = true,
WriteThrough = true,
+ KeepBinaryInStore = false, // Cache store works with deserialized data.
CacheStoreFactory = new EmployeeStoreFactory()
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs
index 7049011..9eb6539 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.ExamplesDll.Datagrid
{
using System;
- using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Apache.Ignite.Core.Cache;
@@ -28,22 +27,22 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// <summary>
/// Example cache store implementation.
/// </summary>
- public class EmployeeStore : CacheStoreAdapter
+ public class EmployeeStore : CacheStoreAdapter<int, Employee>
{
/// <summary>
/// Dictionary representing the store.
/// </summary>
- private readonly ConcurrentDictionary<object, object> _db = new ConcurrentDictionary<object, object>(
- new List<KeyValuePair<object, object>>
+ private readonly ConcurrentDictionary<int, Employee> _db = new ConcurrentDictionary<int, Employee>(
+ new List<KeyValuePair<int, Employee>>
{
- new KeyValuePair<object, object>(1, new Employee(
+ new KeyValuePair<int, Employee>(1, new Employee(
"Allison Mathis",
25300,
new Address("2702 Freedom Lane, San Francisco, CA", 94109),
new List<string> {"Development"}
)),
- new KeyValuePair<object, object>(2, new Employee(
+ new KeyValuePair<int, Employee>(2, new Employee(
"Breana Robbin",
6500,
new Address("3960 Sundown Lane, Austin, TX", 78130),
@@ -57,7 +56,7 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// </summary>
/// <param name="act">Action that loads a cache entry.</param>
/// <param name="args">Optional arguments.</param>
- public override void LoadCache(Action<object, object> act, params object[] args)
+ public override void LoadCache(Action<int, Employee> act, params object[] args)
{
// Iterate over whole underlying store and call act on each entry to load it into the cache.
foreach (var entry in _db)
@@ -72,9 +71,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// <returns>
/// A map of key, values to be stored in the cache.
/// </returns>
- public override IDictionary LoadAll(ICollection keys)
+ public override IEnumerable<KeyValuePair<int, Employee>> LoadAll(IEnumerable<int> keys)
{
- var result = new Dictionary<object, object>();
+ var result = new Dictionary<int, Employee>();
foreach (var key in keys)
result[key] = Load(key);
@@ -88,9 +87,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// </summary>
/// <param name="key">Key to load.</param>
/// <returns>Loaded value</returns>
- public override object Load(object key)
+ public override Employee Load(int key)
{
- object val;
+ Employee val;
_db.TryGetValue(key, out val);
@@ -102,7 +101,7 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// </summary>
/// <param name="key">Key to write.</param>
/// <param name="val">Value to write.</param>
- public override void Write(object key, object val)
+ public override void Write(int key, Employee val)
{
_db[key] = val;
}
@@ -111,9 +110,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid
/// Delete cache entry form store.
/// </summary>
/// <param name="key">Key to delete.</param>
- public override void Delete(object key)
+ public override void Delete(int key)
{
- object val;
+ Employee val;
_db.TryRemove(key, out val);
}