You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/04 18:27:38 UTC
[24/55] [abbrv] ignite git commit: IGNITE-1348: Moved GridGain's .Net
module to Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
new file mode 100644
index 0000000..55bc76c
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
@@ -0,0 +1,1181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using System.Runtime.Serialization;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Event;
+ using Apache.Ignite.Core.Cache.Query;
+ using Apache.Ignite.Core.Cache.Query.Continuous;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Resource;
+ using NUnit.Framework;
+ using CQU = Apache.Ignite.Core.Impl.Cache.Query.Continuous.ContinuousQueryUtils;
+
+ /// <summary>
+ /// Tests for continuous query.
+ /// </summary>
+ [SuppressMessage("ReSharper", "InconsistentNaming")]
+ [SuppressMessage("ReSharper", "PossibleNullReferenceException")]
+ [SuppressMessage("ReSharper", "StaticMemberInGenericType")]
+ public abstract class ContinuousQueryAbstractTest
+ {
+ /** Cache name: ATOMIC, backup. */
+ protected const string CACHE_ATOMIC_BACKUP = "atomic_backup";
+
+ /** Cache name: ATOMIC, no backup. */
+ protected const string CACHE_ATOMIC_NO_BACKUP = "atomic_no_backup";
+
+ /** Cache name: TRANSACTIONAL, backup. */
+ protected const string CACHE_TX_BACKUP = "transactional_backup";
+
+ /** Cache name: TRANSACTIONAL, no backup. */
+ protected const string CACHE_TX_NO_BACKUP = "transactional_no_backup";
+
+ /** Listener events. */
+ public static BlockingCollection<CallbackEvent> CB_EVTS = new BlockingCollection<CallbackEvent>();
+
+ /** Listener events. */
+ public static BlockingCollection<FilterEvent> FILTER_EVTS = new BlockingCollection<FilterEvent>();
+
+ /** First node. */
+ private IIgnite grid1;
+
+ /** Second node. */
+ private IIgnite grid2;
+
+ /** Cache on the first node. */
+ private ICache<int, PortableEntry> cache1;
+
+ /** Cache on the second node. */
+ private ICache<int, PortableEntry> cache2;
+
+ /** Cache name. */
+ private readonly string cacheName;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="cacheName">Cache name.</param>
+ protected ContinuousQueryAbstractTest(string cacheName)
+ {
+ this.cacheName = cacheName;
+ }
+
+ /// <summary>
+ /// Set-up routine.
+ /// </summary>
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ GC.Collect();
+ TestUtils.JvmDebug = true;
+
+ IgniteConfigurationEx cfg = new IgniteConfigurationEx();
+
+ PortableConfiguration portCfg = new PortableConfiguration();
+
+ ICollection<PortableTypeConfiguration> portTypeCfgs = new List<PortableTypeConfiguration>();
+
+ portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableEntry)));
+ portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableFilter)));
+ portTypeCfgs.Add(new PortableTypeConfiguration(typeof(KeepPortableFilter)));
+
+ portCfg.TypeConfigurations = portTypeCfgs;
+
+ cfg.PortableConfiguration = portCfg;
+ cfg.JvmClasspath = TestUtils.CreateTestClasspath();
+ cfg.JvmOptions = TestUtils.TestJavaOptions();
+ cfg.SpringConfigUrl = "config\\cache-query-continuous.xml";
+
+ cfg.GridName = "grid-1";
+ grid1 = Ignition.Start(cfg);
+ cache1 = grid1.Cache<int, PortableEntry>(cacheName);
+
+ cfg.GridName = "grid-2";
+ grid2 = Ignition.Start(cfg);
+ cache2 = grid2.Cache<int, PortableEntry>(cacheName);
+ }
+
+ /// <summary>
+ /// Tear-down routine.
+ /// </summary>
+ [TestFixtureTearDown]
+ public void TearDown()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Before-test routine.
+ /// </summary>
+ [SetUp]
+ public void BeforeTest()
+ {
+ CB_EVTS = new BlockingCollection<CallbackEvent>();
+ FILTER_EVTS = new BlockingCollection<FilterEvent>();
+
+ AbstractFilter<PortableEntry>.res = true;
+ AbstractFilter<PortableEntry>.err = false;
+ AbstractFilter<PortableEntry>.marshErr = false;
+ AbstractFilter<PortableEntry>.unmarshErr = false;
+
+ cache1.Remove(PrimaryKey(cache1));
+ cache1.Remove(PrimaryKey(cache2));
+
+ Assert.AreEqual(0, cache1.Size());
+ Assert.AreEqual(0, cache2.Size());
+
+ Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+ }
+
+ /// <summary>
+ /// Test arguments validation.
+ /// </summary>
+ [Test]
+ public void TestValidation()
+ {
+ Assert.Throws<ArgumentException>(() => { cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(null)); });
+ }
+
+ /// <summary>
+ /// Test multiple closes.
+ /// </summary>
+ [Test]
+ public void TestMultipleClose()
+ {
+ int key1 = PrimaryKey(cache1);
+ int key2 = PrimaryKey(cache2);
+
+ ContinuousQuery<int, PortableEntry> qry =
+ new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+ IDisposable qryHnd;
+
+ using (qryHnd = cache1.QueryContinuous(qry))
+ {
+ // Put from local node.
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckCallbackSingle(key1, null, Entry(key1));
+
+ // Put from remote node.
+ cache2.GetAndPut(key2, Entry(key2));
+ CheckCallbackSingle(key2, null, Entry(key2));
+ }
+
+ qryHnd.Dispose();
+ }
+
+ /// <summary>
+ /// Test regular callback operations.
+ /// </summary>
+ [Test]
+ public void TestCallback()
+ {
+ CheckCallback(false);
+ }
+
+ /// <summary>
+ /// Check regular callback execution.
+ /// </summary>
+ /// <param name="loc"></param>
+ protected void CheckCallback(bool loc)
+ {
+ int key1 = PrimaryKey(cache1);
+ int key2 = PrimaryKey(cache2);
+
+ ContinuousQuery<int, PortableEntry> qry = loc ?
+ new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>(), true) :
+ new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+ using (cache1.QueryContinuous(qry))
+ {
+ // Put from local node.
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckCallbackSingle(key1, null, Entry(key1));
+
+ cache1.GetAndPut(key1, Entry(key1 + 1));
+ CheckCallbackSingle(key1, Entry(key1), Entry(key1 + 1));
+
+ cache1.Remove(key1);
+ CheckCallbackSingle(key1, Entry(key1 + 1), null);
+
+ // Put from remote node.
+ cache2.GetAndPut(key2, Entry(key2));
+
+ if (loc)
+ CheckNoCallback(100);
+ else
+ CheckCallbackSingle(key2, null, Entry(key2));
+
+ cache1.GetAndPut(key2, Entry(key2 + 1));
+
+ if (loc)
+ CheckNoCallback(100);
+ else
+ CheckCallbackSingle(key2, Entry(key2), Entry(key2 + 1));
+
+ cache1.Remove(key2);
+
+ if (loc)
+ CheckNoCallback(100);
+ else
+ CheckCallbackSingle(key2, Entry(key2 + 1), null);
+ }
+
+ cache1.Put(key1, Entry(key1));
+ CheckNoCallback(100);
+
+ cache1.Put(key2, Entry(key2));
+ CheckNoCallback(100);
+ }
+
+ /// <summary>
+ /// Test Ignite injection into callback.
+ /// </summary>
+ [Test]
+ public void TestCallbackInjection()
+ {
+ Listener<PortableEntry> cb = new Listener<PortableEntry>();
+
+ Assert.IsNull(cb.ignite);
+
+ using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb)))
+ {
+ Assert.IsNotNull(cb.ignite);
+ }
+ }
+
+ /// <summary>
+ /// Test portable filter logic.
+ /// </summary>
+ [Test]
+ public void TestFilterPortable()
+ {
+ CheckFilter(true, false);
+ }
+
+ /// <summary>
+ /// Test serializable filter logic.
+ /// </summary>
+ [Test]
+ public void TestFilterSerializable()
+ {
+ CheckFilter(false, false);
+ }
+
+ /// <summary>
+ /// Check filter.
+ /// </summary>
+ /// <param name="portable">Portable.</param>
+ /// <param name="loc">Local cache flag.</param>
+ protected void CheckFilter(bool portable, bool loc)
+ {
+ ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+ ICacheEntryEventFilter<int, PortableEntry> filter =
+ portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+ ContinuousQuery<int, PortableEntry> qry = loc ?
+ new ContinuousQuery<int, PortableEntry>(lsnr, filter, true) :
+ new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+ using (cache1.QueryContinuous(qry))
+ {
+ // Put from local node.
+ int key1 = PrimaryKey(cache1);
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckFilterSingle(key1, null, Entry(key1));
+ CheckCallbackSingle(key1, null, Entry(key1));
+
+ // Put from remote node.
+ int key2 = PrimaryKey(cache2);
+ cache1.GetAndPut(key2, Entry(key2));
+
+ if (loc)
+ {
+ CheckNoFilter(key2);
+ CheckNoCallback(key2);
+ }
+ else
+ {
+ CheckFilterSingle(key2, null, Entry(key2));
+ CheckCallbackSingle(key2, null, Entry(key2));
+ }
+
+ AbstractFilter<PortableEntry>.res = false;
+
+ // Ignored put from local node.
+ cache1.GetAndPut(key1, Entry(key1 + 1));
+ CheckFilterSingle(key1, Entry(key1), Entry(key1 + 1));
+ CheckNoCallback(100);
+
+ // Ignored put from remote node.
+ cache1.GetAndPut(key2, Entry(key2 + 1));
+
+ if (loc)
+ CheckNoFilter(100);
+ else
+ CheckFilterSingle(key2, Entry(key2), Entry(key2 + 1));
+
+ CheckNoCallback(100);
+ }
+ }
+
+ /// <summary>
+ /// Test portable filter error during invoke.
+ /// </summary>
+ [Ignore("IGNITE-521")]
+ [Test]
+ public void TestFilterInvokeErrorPortable()
+ {
+ CheckFilterInvokeError(true);
+ }
+
+ /// <summary>
+ /// Test serializable filter error during invoke.
+ /// </summary>
+ [Ignore("IGNITE-521")]
+ [Test]
+ public void TestFilterInvokeErrorSerializable()
+ {
+ CheckFilterInvokeError(false);
+ }
+
+ /// <summary>
+ /// Check filter error handling logic during invoke.
+ /// </summary>
+ private void CheckFilterInvokeError(bool portable)
+ {
+ AbstractFilter<PortableEntry>.err = true;
+
+ ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+ ICacheEntryEventFilter<int, PortableEntry> filter =
+ portable ? (AbstractFilter<PortableEntry>) new PortableFilter() : new SerializableFilter();
+
+ ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+ using (cache1.QueryContinuous(qry))
+ {
+ // Put from local node.
+ try
+ {
+ cache1.GetAndPut(PrimaryKey(cache1), Entry(1));
+
+ Assert.Fail("Should not reach this place.");
+ }
+ catch (IgniteException)
+ {
+ // No-op.
+ }
+ catch (Exception)
+ {
+ Assert.Fail("Unexpected error.");
+ }
+
+ // Put from remote node.
+ try
+ {
+ cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+ Assert.Fail("Should not reach this place.");
+ }
+ catch (IgniteException)
+ {
+ // No-op.
+ }
+ catch (Exception)
+ {
+ Assert.Fail("Unexpected error.");
+ }
+ }
+ }
+
+ /// <summary>
+ /// Test portable filter marshalling error.
+ /// </summary>
+ [Test]
+ public void TestFilterMarshalErrorPortable()
+ {
+ CheckFilterMarshalError(true);
+ }
+
+ /// <summary>
+ /// Test serializable filter marshalling error.
+ /// </summary>
+ [Test]
+ public void TestFilterMarshalErrorSerializable()
+ {
+ CheckFilterMarshalError(false);
+ }
+
+ /// <summary>
+ /// Check filter marshal error handling.
+ /// </summary>
+ /// <param name="portable">Portable flag.</param>
+ private void CheckFilterMarshalError(bool portable)
+ {
+ AbstractFilter<PortableEntry>.marshErr = true;
+
+ ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+ ICacheEntryEventFilter<int, PortableEntry> filter =
+ portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+ ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+ Assert.Throws<Exception>(() =>
+ {
+ using (cache1.QueryContinuous(qry))
+ {
+ // No-op.
+ }
+ });
+ }
+
+ /// <summary>
+ /// Test non-serializable filter error.
+ /// </summary>
+ [Test]
+ public void TestFilterNonSerializable()
+ {
+ CheckFilterNonSerializable(false);
+ }
+
+ /// <summary>
+ /// Test non-serializable filter behavior.
+ /// </summary>
+ /// <param name="loc"></param>
+ protected void CheckFilterNonSerializable(bool loc)
+ {
+ AbstractFilter<PortableEntry>.unmarshErr = true;
+
+ ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+ ICacheEntryEventFilter<int, PortableEntry> filter = new LocalFilter();
+
+ ContinuousQuery<int, PortableEntry> qry = loc
+ ? new ContinuousQuery<int, PortableEntry>(lsnr, filter, true)
+ : new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+ if (loc)
+ {
+ using (cache1.QueryContinuous(qry))
+ {
+ // Local put must be fine.
+ int key1 = PrimaryKey(cache1);
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckFilterSingle(key1, null, Entry(key1));
+ }
+ }
+ else
+ {
+ Assert.Throws<SerializationException>(() =>
+ {
+ using (cache1.QueryContinuous(qry))
+ {
+ // No-op.
+ }
+ });
+ }
+ }
+
+ /// <summary>
+ /// Test portable filter unmarshalling error.
+ /// </summary>
+ [Ignore("IGNITE-521")]
+ [Test]
+ public void TestFilterUnmarshalErrorPortable()
+ {
+ CheckFilterUnmarshalError(true);
+ }
+
+ /// <summary>
+ /// Test serializable filter unmarshalling error.
+ /// </summary>
+ [Ignore("IGNITE-521")]
+ [Test]
+ public void TestFilterUnmarshalErrorSerializable()
+ {
+ CheckFilterUnmarshalError(false);
+ }
+
+ /// <summary>
+ /// Check filter unmarshal error handling.
+ /// </summary>
+ /// <param name="portable">Portable flag.</param>
+ private void CheckFilterUnmarshalError(bool portable)
+ {
+ AbstractFilter<PortableEntry>.unmarshErr = true;
+
+ ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+ ICacheEntryEventFilter<int, PortableEntry> filter =
+ portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+ ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+ using (cache1.QueryContinuous(qry))
+ {
+ // Local put must be fine.
+ int key1 = PrimaryKey(cache1);
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckFilterSingle(key1, null, Entry(key1));
+
+ // Remote put must fail.
+ try
+ {
+ cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+ Assert.Fail("Should not reach this place.");
+ }
+ catch (IgniteException)
+ {
+ // No-op.
+ }
+ catch (Exception)
+ {
+ Assert.Fail("Unexpected error.");
+ }
+ }
+ }
+
+ /// <summary>
+ /// Test Ignite injection into filters.
+ /// </summary>
+ [Test]
+ public void TestFilterInjection()
+ {
+ Listener<PortableEntry> cb = new Listener<PortableEntry>();
+ PortableFilter filter = new PortableFilter();
+
+ Assert.IsNull(filter.ignite);
+
+ using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb, filter)))
+ {
+ // Local injection.
+ Assert.IsNotNull(filter.ignite);
+
+ // Remote injection.
+ cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+ FilterEvent evt;
+
+ Assert.IsTrue(FILTER_EVTS.TryTake(out evt, 500));
+
+ Assert.IsNotNull(evt.ignite);
+ }
+ }
+
+
+ /// <summary>
+ /// Test "keep-portable" scenario.
+ /// </summary>
+ [Test]
+ public void TestKeepPortable()
+ {
+ var cache = cache1.WithKeepPortable<int, IPortableObject>();
+
+ ContinuousQuery<int, IPortableObject> qry = new ContinuousQuery<int, IPortableObject>(
+ new Listener<IPortableObject>(), new KeepPortableFilter());
+
+ using (cache.QueryContinuous(qry))
+ {
+ // 1. Local put.
+ cache1.GetAndPut(PrimaryKey(cache1), Entry(1));
+
+ CallbackEvent cbEvt;
+ FilterEvent filterEvt;
+
+ Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
+ Assert.AreEqual(PrimaryKey(cache1), filterEvt.entry.Key);
+ Assert.AreEqual(null, filterEvt.entry.OldValue);
+ Assert.AreEqual(Entry(1), (filterEvt.entry.Value as IPortableObject)
+ .Deserialize<PortableEntry>());
+
+ Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
+ Assert.AreEqual(1, cbEvt.entries.Count);
+ Assert.AreEqual(PrimaryKey(cache1), cbEvt.entries.First().Key);
+ Assert.AreEqual(null, cbEvt.entries.First().OldValue);
+ Assert.AreEqual(Entry(1), (cbEvt.entries.First().Value as IPortableObject)
+ .Deserialize<PortableEntry>());
+
+ // 2. Remote put.
+ cache1.GetAndPut(PrimaryKey(cache2), Entry(2));
+
+ Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
+ Assert.AreEqual(PrimaryKey(cache2), filterEvt.entry.Key);
+ Assert.AreEqual(null, filterEvt.entry.OldValue);
+ Assert.AreEqual(Entry(2), (filterEvt.entry.Value as IPortableObject)
+ .Deserialize<PortableEntry>());
+
+ Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
+ Assert.AreEqual(1, cbEvt.entries.Count);
+ Assert.AreEqual(PrimaryKey(cache2), cbEvt.entries.First().Key);
+ Assert.AreEqual(null, cbEvt.entries.First().OldValue);
+ Assert.AreEqual(Entry(2),
+ (cbEvt.entries.First().Value as IPortableObject).Deserialize<PortableEntry>());
+ }
+ }
+
+ /// <summary>
+ /// Test whether buffer size works fine.
+ /// </summary>
+ [Test]
+ public void TestBufferSize()
+ {
+ // Put two remote keys in advance.
+ List<int> rmtKeys = PrimaryKeys(cache2, 2);
+
+ ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+ qry.BufferSize = 2;
+ qry.TimeInterval = TimeSpan.FromMilliseconds(1000000);
+
+ using (cache1.QueryContinuous(qry))
+ {
+ qry.BufferSize = 2;
+
+ cache1.GetAndPut(rmtKeys[0], Entry(rmtKeys[0]));
+
+ CheckNoCallback(100);
+
+ cache1.GetAndPut(rmtKeys[1], Entry(rmtKeys[1]));
+
+ CallbackEvent evt;
+
+ Assert.IsTrue(CB_EVTS.TryTake(out evt, 1000));
+
+ Assert.AreEqual(2, evt.entries.Count);
+
+ var entryRmt0 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[0]); });
+ var entryRmt1 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[1]); });
+
+ Assert.AreEqual(rmtKeys[0], entryRmt0.Key);
+ Assert.IsNull(entryRmt0.OldValue);
+ Assert.AreEqual(Entry(rmtKeys[0]), entryRmt0.Value);
+
+ Assert.AreEqual(rmtKeys[1], entryRmt1.Key);
+ Assert.IsNull(entryRmt1.OldValue);
+ Assert.AreEqual(Entry(rmtKeys[1]), entryRmt1.Value);
+ }
+
+ cache1.Remove(rmtKeys[0]);
+ cache1.Remove(rmtKeys[1]);
+ }
+
+ /// <summary>
+ /// Test whether timeout works fine.
+ /// </summary>
+ [Test]
+ public void TestTimeout()
+ {
+ int key1 = PrimaryKey(cache1);
+ int key2 = PrimaryKey(cache2);
+
+ ContinuousQuery<int, PortableEntry> qry =
+ new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+ qry.BufferSize = 2;
+ qry.TimeInterval = TimeSpan.FromMilliseconds(500);
+
+ using (cache1.QueryContinuous(qry))
+ {
+ // Put from local node.
+ cache1.GetAndPut(key1, Entry(key1));
+ CheckCallbackSingle(key1, null, Entry(key1));
+
+ // Put from remote node.
+ cache1.GetAndPut(key2, Entry(key2));
+ CheckNoCallback(100);
+ CheckCallbackSingle(key2, null, Entry(key2), 1000);
+ }
+ }
+
+ /// <summary>
+ /// Test whether nested Ignite API call from callback works fine.
+ /// </summary>
+ [Test]
+ public void TestNestedCallFromCallback()
+ {
+ var cache = cache1.WithKeepPortable<int, IPortableObject>();
+
+ int key = PrimaryKey(cache1);
+
+ NestedCallListener cb = new NestedCallListener();
+
+ using (cache.QueryContinuous(new ContinuousQuery<int, IPortableObject>(cb)))
+ {
+ cache1.GetAndPut(key, Entry(key));
+
+ cb.countDown.Wait();
+ }
+
+ cache.Remove(key);
+ }
+
+ /// <summary>
+ /// Tests the initial query.
+ /// </summary>
+ [Test]
+ public void TestInitialQuery()
+ {
+ // Scan query, GetAll
+ TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.GetAll());
+
+ // Scan query, iterator
+ TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.ToList());
+
+ // Sql query, GetAll
+ TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.GetAll());
+
+ // Sql query, iterator
+ TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.ToList());
+
+ // Text query, GetAll
+ TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.GetAll());
+
+ // Text query, iterator
+ TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.ToList());
+
+ // Test exception: invalid initial query
+ var ex = Assert.Throws<IgniteException>(
+ () => TestInitialQuery(new TextQuery(typeof (PortableEntry), "*"), cur => cur.GetAll()));
+
+ Assert.AreEqual("Cannot parse '*': '*' or '?' not allowed as first character in WildcardQuery", ex.Message);
+ }
+
+ /// <summary>
+ /// Tests the initial query.
+ /// </summary>
+ private void TestInitialQuery(QueryBase initialQry, Func<IQueryCursor<ICacheEntry<int, PortableEntry>>,
+ IEnumerable<ICacheEntry<int, PortableEntry>>> getAllFunc)
+ {
+ var qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+ cache1.Put(11, Entry(11));
+ cache1.Put(12, Entry(12));
+ cache1.Put(33, Entry(33));
+
+ try
+ {
+ IContinuousQueryHandle<ICacheEntry<int, PortableEntry>> contQry;
+
+ using (contQry = cache1.QueryContinuous(qry, initialQry))
+ {
+ // Check initial query
+ var initialEntries =
+ getAllFunc(contQry.GetInitialQueryCursor()).Distinct().OrderBy(x => x.Key).ToList();
+
+ Assert.Throws<InvalidOperationException>(() => contQry.GetInitialQueryCursor());
+
+ Assert.AreEqual(2, initialEntries.Count);
+
+ for (int i = 0; i < initialEntries.Count; i++)
+ {
+ Assert.AreEqual(i + 11, initialEntries[i].Key);
+ Assert.AreEqual(i + 11, initialEntries[i].Value.val);
+ }
+
+ // Check continuous query
+ cache1.Put(44, Entry(44));
+ CheckCallbackSingle(44, null, Entry(44));
+ }
+
+ Assert.Throws<ObjectDisposedException>(() => contQry.GetInitialQueryCursor());
+
+ contQry.Dispose(); // multiple dispose calls are ok
+ }
+ finally
+ {
+ cache1.Clear();
+ }
+ }
+
+ /// <summary>
+ /// Check single filter event.
+ /// </summary>
+ /// <param name="expKey">Expected key.</param>
+ /// <param name="expOldVal">Expected old value.</param>
+ /// <param name="expVal">Expected value.</param>
+ private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal)
+ {
+ CheckFilterSingle(expKey, expOldVal, expVal, 1000);
+ }
+
+ /// <summary>
+ /// Check single filter event.
+ /// </summary>
+ /// <param name="expKey">Expected key.</param>
+ /// <param name="expOldVal">Expected old value.</param>
+ /// <param name="expVal">Expected value.</param>
+ /// <param name="timeout">Timeout.</param>
+ private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout)
+ {
+ FilterEvent evt;
+
+ Assert.IsTrue(FILTER_EVTS.TryTake(out evt, timeout));
+
+ Assert.AreEqual(expKey, evt.entry.Key);
+ Assert.AreEqual(expOldVal, evt.entry.OldValue);
+ Assert.AreEqual(expVal, evt.entry.Value);
+ }
+
+ /// <summary>
+ /// Ensure that no filter events are logged.
+ /// </summary>
+ /// <param name="timeout">Timeout.</param>
+ private void CheckNoFilter(int timeout)
+ {
+ FilterEvent evt;
+
+ Assert.IsFalse(FILTER_EVTS.TryTake(out evt, timeout));
+ }
+
+ /// <summary>
+ /// Check single callback event.
+ /// </summary>
+ /// <param name="expKey">Expected key.</param>
+ /// <param name="expOldVal">Expected old value.</param>
+ /// <param name="expVal">Expected new value.</param>
+ private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal)
+ {
+ CheckCallbackSingle(expKey, expOldVal, expVal, 1000);
+ }
+
+ /// <summary>
+ /// Check single callback event.
+ /// </summary>
+ /// <param name="expKey">Expected key.</param>
+ /// <param name="expOldVal">Expected old value.</param>
+ /// <param name="expVal">Expected new value.</param>
+ /// <param name="timeout">Timeout.</param>
+ private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout)
+ {
+ CallbackEvent evt;
+
+ Assert.IsTrue(CB_EVTS.TryTake(out evt, timeout));
+
+ Assert.AreEqual(1, evt.entries.Count);
+
+ Assert.AreEqual(expKey, evt.entries.First().Key);
+ Assert.AreEqual(expOldVal, evt.entries.First().OldValue);
+ Assert.AreEqual(expVal, evt.entries.First().Value);
+ }
+
+ /// <summary>
+ /// Ensure that no callback events are logged.
+ /// </summary>
+ /// <param name="timeout">Timeout.</param>
+ private void CheckNoCallback(int timeout)
+ {
+ CallbackEvent evt;
+
+ Assert.IsFalse(CB_EVTS.TryTake(out evt, timeout));
+ }
+
+ /// <summary>
+ /// Craate entry.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <returns>Entry.</returns>
+ private static PortableEntry Entry(int val)
+ {
+ return new PortableEntry(val);
+ }
+
+ /// <summary>
+ /// Get primary key for cache.
+ /// </summary>
+ /// <param name="cache">Cache.</param>
+ /// <returns>Primary key.</returns>
+ private static int PrimaryKey<T>(ICache<int, T> cache)
+ {
+ return PrimaryKeys(cache, 1)[0];
+ }
+
+ /// <summary>
+ /// Get primary keys for cache.
+ /// </summary>
+ /// <param name="cache">Cache.</param>
+ /// <param name="cnt">Amount of keys.</param>
+ /// <param name="startFrom">Value to start from.</param>
+ /// <returns></returns>
+ private static List<int> PrimaryKeys<T>(ICache<int, T> cache, int cnt, int startFrom = 0)
+ {
+ IClusterNode node = cache.Ignite.Cluster.LocalNode;
+
+ ICacheAffinity aff = cache.Ignite.Affinity(cache.Name);
+
+ List<int> keys = new List<int>(cnt);
+
+ for (int i = startFrom; i < startFrom + 100000; i++)
+ {
+ if (aff.IsPrimary(node, i))
+ {
+ keys.Add(i);
+
+ if (keys.Count == cnt)
+ return keys;
+ }
+ }
+
+ Assert.Fail("Failed to find " + cnt + " primary keys.");
+
+ return null;
+ }
+
+ /// <summary>
+ /// Portable entry.
+ /// </summary>
+ public class PortableEntry
+ {
+ /** Value. */
+ public readonly int val;
+
+ /** <inheritDot /> */
+ public override int GetHashCode()
+ {
+ return val;
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ public PortableEntry(int val)
+ {
+ this.val = val;
+ }
+
+ /** <inheritDoc /> */
+ public override bool Equals(object obj)
+ {
+ return obj != null && obj is PortableEntry && ((PortableEntry)obj).val == val;
+ }
+ }
+
+ /// <summary>
+ /// Abstract filter.
+ /// </summary>
+ [Serializable]
+ public abstract class AbstractFilter<V> : ICacheEntryEventFilter<int, V>
+ {
+ /** Result. */
+ public static volatile bool res = true;
+
+ /** Throw error on invocation. */
+ public static volatile bool err;
+
+ /** Throw error during marshalling. */
+ public static volatile bool marshErr;
+
+ /** Throw error during unmarshalling. */
+ public static volatile bool unmarshErr;
+
+ /** Grid. */
+ [InstanceResource]
+ public IIgnite ignite;
+
+ /** <inheritDoc /> */
+ public bool Evaluate(ICacheEntryEvent<int, V> evt)
+ {
+ if (err)
+ throw new Exception("Filter error.");
+
+ FILTER_EVTS.Add(new FilterEvent(ignite,
+ CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value)));
+
+ return res;
+ }
+ }
+
+ /// <summary>
+ /// Filter which cannot be serialized.
+ /// </summary>
+ public class LocalFilter : AbstractFilter<PortableEntry>
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Portable filter.
+ /// </summary>
+ public class PortableFilter : AbstractFilter<PortableEntry>, IPortableMarshalAware
+ {
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ if (marshErr)
+ throw new Exception("Filter marshalling error.");
+ }
+
+ /** <inheritDoc /> */
+ public void ReadPortable(IPortableReader reader)
+ {
+ if (unmarshErr)
+ throw new Exception("Filter unmarshalling error.");
+ }
+ }
+
+ /// <summary>
+ /// Serializable filter.
+ /// </summary>
+ [Serializable]
+ public class SerializableFilter : AbstractFilter<PortableEntry>, ISerializable
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public SerializableFilter()
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Serialization constructor.
+ /// </summary>
+ /// <param name="info">Info.</param>
+ /// <param name="context">Context.</param>
+ protected SerializableFilter(SerializationInfo info, StreamingContext context)
+ {
+ if (unmarshErr)
+ throw new Exception("Filter unmarshalling error.");
+ }
+
+ /** <inheritDoc /> */
+ public void GetObjectData(SerializationInfo info, StreamingContext context)
+ {
+ if (marshErr)
+ throw new Exception("Filter marshalling error.");
+ }
+ }
+
+ /// <summary>
+ /// Filter for "keep-portable" scenario.
+ /// </summary>
+ public class KeepPortableFilter : AbstractFilter<IPortableObject>
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Listener.
+ /// </summary>
+ public class Listener<V> : ICacheEntryEventListener<int, V>
+ {
+ [InstanceResource]
+ public IIgnite ignite;
+
+ /** <inheritDoc /> */
+ public void OnEvent(IEnumerable<ICacheEntryEvent<int, V>> evts)
+ {
+ ICollection<ICacheEntryEvent<object, object>> entries0 =
+ new List<ICacheEntryEvent<object, object>>();
+
+ foreach (ICacheEntryEvent<int, V> evt in evts)
+ entries0.Add(CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value));
+
+ CB_EVTS.Add(new CallbackEvent(entries0));
+ }
+ }
+
+ /// <summary>
+ /// Listener with nested Ignite API call.
+ /// </summary>
+ public class NestedCallListener : ICacheEntryEventListener<int, IPortableObject>
+ {
+ /** Event. */
+ public readonly CountdownEvent countDown = new CountdownEvent(1);
+
+ public void OnEvent(IEnumerable<ICacheEntryEvent<int, IPortableObject>> evts)
+ {
+ foreach (ICacheEntryEvent<int, IPortableObject> evt in evts)
+ {
+ IPortableObject val = evt.Value;
+
+ IPortableMetadata meta = val.Metadata();
+
+ Assert.AreEqual(typeof(PortableEntry).Name, meta.TypeName);
+ }
+
+ countDown.Signal();
+ }
+ }
+
+ /// <summary>
+ /// Filter event.
+ /// </summary>
+ public class FilterEvent
+ {
+ /** Grid. */
+ public IIgnite ignite;
+
+ /** Entry. */
+ public ICacheEntryEvent<object, object> entry;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="ignite">Grid.</param>
+ /// <param name="entry">Entry.</param>
+ public FilterEvent(IIgnite ignite, ICacheEntryEvent<object, object> entry)
+ {
+ this.ignite = ignite;
+ this.entry = entry;
+ }
+ }
+
+ /// <summary>
+ /// Callbakc event.
+ /// </summary>
+ public class CallbackEvent
+ {
+ /** Entries. */
+ public ICollection<ICacheEntryEvent<object, object>> entries;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="entries">Entries.</param>
+ public CallbackEvent(ICollection<ICacheEntryEvent<object, object>> entries)
+ {
+ this.entries = entries;
+ }
+ }
+
+ /// <summary>
+ /// ScanQuery filter for InitialQuery test.
+ /// </summary>
+ [Serializable]
+ private class InitialQueryScanFilter : ICacheEntryFilter<int, PortableEntry>
+ {
+ /** <inheritdoc /> */
+ public bool Invoke(ICacheEntry<int, PortableEntry> entry)
+ {
+ return entry.Key < 33;
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
new file mode 100644
index 0000000..ac44f10
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ /// <summary>
+ /// Continuous query tests for ATOMIC cache with backups.
+ /// </summary>
+ public class ContinuousQueryAtomiclBackupTest : ContinuousQueryAbstractTest
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public ContinuousQueryAtomiclBackupTest() : base(CACHE_ATOMIC_BACKUP)
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
new file mode 100644
index 0000000..8e1a18f
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ /// <summary>
+ /// Continuous query tests for ATOMIC cache with no backups.
+ /// </summary>
+ public class ContinuousQueryAtomiclNoBackupTest : ContinuousQueryNoBackupAbstractTest
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public ContinuousQueryAtomiclNoBackupTest()
+ : base(CACHE_ATOMIC_NO_BACKUP)
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
new file mode 100644
index 0000000..aa7d627
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for ocntinuous query when there are no backups.
+ /// </summary>
+ public abstract class ContinuousQueryNoBackupAbstractTest : ContinuousQueryAbstractTest
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="cacheName">Cache name.</param>
+ protected ContinuousQueryNoBackupAbstractTest(string cacheName) : base(cacheName)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Test regular callback operations for local query.
+ /// </summary>
+ [Test]
+ public void TestCallbackLocal()
+ {
+ CheckCallback(true);
+ }
+
+ /// <summary>
+ /// Test portable filter logic.
+ /// </summary>
+ [Test]
+ public void TestFilterPortableLocal()
+ {
+ CheckFilter(true, true);
+ }
+
+ /// <summary>
+ /// Test serializable filter logic.
+ /// </summary>
+ [Test]
+ public void TestFilterSerializableLocal()
+ {
+ CheckFilter(false, true);
+ }
+
+ /// <summary>
+ /// Test non-serializable filter for local query.
+ /// </summary>
+ [Test]
+ public void TestFilterNonSerializableLocal()
+ {
+ CheckFilterNonSerializable(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
new file mode 100644
index 0000000..08ae88c
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ /// <summary>
+ /// Continuous query tests for TRANSACTIONAL cache with backups.
+ /// </summary>
+ public class ContinuousQueryTransactionalBackupTest : ContinuousQueryAbstractTest
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public ContinuousQueryTransactionalBackupTest()
+ : base(CACHE_TX_BACKUP)
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
new file mode 100644
index 0000000..685f7b4
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+ /// <summary>
+ /// Continuous query tests for TRANSACTIONAL cache with no backups.
+ /// </summary>
+ public class ContinuousQueryTransactionalNoBackupTest : ContinuousQueryNoBackupAbstractTest
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public ContinuousQueryTransactionalNoBackupTest() : base(CACHE_TX_NO_BACKUP)
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
new file mode 100644
index 0000000..33eec7b
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using System;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Portable;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for GridCacheParallelLoadStoreAdapter.
+ /// </summary>
+ public class CacheParallelLoadStoreTest
+ {
+ // object store name
+ private const string ObjectStoreCacheName = "object_store_parallel";
+
+ /// <summary>
+ /// Set up test class.
+ /// </summary>
+ [TestFixtureSetUp]
+ public virtual void BeforeTests()
+ {
+ TestUtils.KillProcesses();
+ TestUtils.JvmDebug = true;
+
+ Ignition.Start(new IgniteConfiguration
+ {
+ JvmClasspath = TestUtils.CreateTestClasspath(),
+ JvmOptions = TestUtils.TestJavaOptions(),
+ SpringConfigUrl = "config\\native-client-test-cache-parallel-store.xml",
+ PortableConfiguration = new PortableConfiguration
+ {
+ Types = new[] {typeof (CacheTestParallelLoadStore.Record).FullName}
+ }
+ });
+ }
+
+ /// <summary>
+ /// Tear down test class.
+ /// </summary>
+ [TestFixtureTearDown]
+ public virtual void AfterTests()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Test setup.
+ /// </summary>
+ [SetUp]
+ public void BeforeTest()
+ {
+ Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+ }
+
+ /// <summary>
+ /// Tests the LoadCache.
+ /// </summary>
+ [Test]
+ public void TestLoadCache()
+ {
+ var cache = GetCache();
+
+ Assert.AreEqual(0, cache.Size());
+
+ const int minId = 113;
+ const int expectedItemCount = CacheTestParallelLoadStore.InputDataLength - minId;
+
+ CacheTestParallelLoadStore.ResetCounters();
+
+ cache.LocalLoadCache(null, minId);
+
+ Assert.AreEqual(expectedItemCount, cache.Size());
+
+ // check items presence; increment by 100 to speed up the test
+ for (var i = minId; i < expectedItemCount; i += 100)
+ {
+ var rec = cache.Get(i);
+ Assert.AreEqual(i, rec.Id);
+ }
+
+ // check that items were processed in parallel
+ Assert.GreaterOrEqual(CacheTestParallelLoadStore.UniqueThreadCount, Environment.ProcessorCount);
+ }
+
+ /// <summary>
+ /// Gets the cache.
+ /// </summary>
+ private static ICache<int, CacheTestParallelLoadStore.Record> GetCache()
+ {
+ return Ignition.GetIgnite().Cache<int, CacheTestParallelLoadStore.Record>(ObjectStoreCacheName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
new file mode 100644
index 0000000..bc55901
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Impl;
+ using Apache.Ignite.Core.Resource;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for store session.
+ /// </summary>
+ public class CacheStoreSessionTest
+ {
+ /** Grid name. */
+ private const string IgniteName = "grid";
+
+ /** Cache 1 name. */
+ private const string Cache1 = "cache1";
+
+ /** Cache 2 name. */
+ private const string Cache2 = "cache2";
+
+ /** Operations. */
+ private static ConcurrentBag<ICollection<Operation>> _dumps;
+
+ /// <summary>
+ /// Set up routine.
+ /// </summary>
+ [TestFixtureSetUp]
+ public virtual void BeforeTests()
+ {
+ //TestUtils.JVM_DEBUG = true;
+
+ TestUtils.KillProcesses();
+
+ TestUtils.JvmDebug = true;
+
+ IgniteConfigurationEx cfg = new IgniteConfigurationEx
+ {
+ GridName = IgniteName,
+ JvmClasspath = TestUtils.CreateTestClasspath(),
+ JvmOptions = TestUtils.TestJavaOptions(),
+ SpringConfigUrl = @"config\cache\store\cache-store-session.xml"
+ };
+
+
+ Ignition.Start(cfg);
+ }
+
+ /// <summary>
+ /// Tear down routine.
+ /// </summary>
+ [TestFixtureTearDown]
+ public virtual void AfterTests()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Test basic session API.
+ /// </summary>
+ [Test]
+ public void TestSession()
+ {
+ _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+ var ignite = Ignition.GetIgnite(IgniteName);
+
+ var cache1 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache1);
+ var cache2 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache2);
+
+ // 1. Test rollback.
+ using (var tx = ignite.Transactions.TxStart())
+ {
+ cache1.Put(1, 1);
+ cache2.Put(2, 2);
+
+ tx.Rollback();
+ }
+
+ Assert.AreEqual(1, _dumps.Count);
+ var ops = _dumps.First();
+ Assert.AreEqual(1, ops.Count);
+
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && !op.Commit));
+
+ _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+ // 2. Test puts.
+ using (var tx = ignite.Transactions.TxStart())
+ {
+ cache1.Put(1, 1);
+ cache2.Put(2, 2);
+
+ tx.Commit();
+ }
+
+ Assert.AreEqual(1, _dumps.Count);
+ ops = _dumps.First();
+ Assert.AreEqual(3, ops.Count);
+
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache1.Equals(op.CacheName) && 1.Equals(op.Key) && 1.Equals(op.Value)));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache2.Equals(op.CacheName) && 2.Equals(op.Key) && 2.Equals(op.Value)));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+
+ _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+ // 3. Test removes.
+ using (var tx = ignite.Transactions.TxStart())
+ {
+ cache1.Remove(1);
+ cache2.Remove(2);
+
+ tx.Commit();
+ }
+
+ Assert.AreEqual(1, _dumps.Count);
+ ops = _dumps.First();
+ Assert.AreEqual(3, ops.Count);
+
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache1.Equals(op.CacheName) && 1.Equals(op.Key)));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache2.Equals(op.CacheName) && 2.Equals(op.Key)));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+ }
+
+ /// <summary>
+ /// Dump operations.
+ /// </summary>
+ /// <param name="dump">Dump.</param>
+ internal static void DumpOperations(ICollection<Operation> dump)
+ {
+ _dumps.Add(dump);
+ }
+
+ /// <summary>
+ /// Test store implementation.
+ /// </summary>
+ public class Store : CacheStoreAdapter
+ {
+ /** Store session. */
+ [StoreSessionResource]
+#pragma warning disable 649
+ private ICacheStoreSession _ses;
+#pragma warning restore 649
+
+ /** <inheritdoc /> */
+ public override object Load(object key)
+ {
+ throw new NotImplementedException();
+ }
+
+ /** <inheritdoc /> */
+ public override void Write(object key, object val)
+ {
+ GetOperations().Add(new Operation(_ses.CacheName, OperationType.Write, (int)key, (int)val));
+ }
+
+ /** <inheritdoc /> */
+ public override void Delete(object key)
+ {
+ GetOperations().Add(new Operation(_ses.CacheName, OperationType.Delete, (int)key, 0));
+ }
+
+ /** <inheritdoc /> */
+ public override void SessionEnd(bool commit)
+ {
+ Operation op = new Operation(_ses.CacheName, OperationType.SesEnd) { Commit = commit };
+
+ ICollection<Operation> ops = GetOperations();
+
+ ops.Add(op);
+
+ DumpOperations(ops);
+ }
+
+ /// <summary>
+ /// Get collection with operations.
+ /// </summary>
+ /// <returns>Operations.</returns>
+ private ICollection<Operation> GetOperations()
+ {
+ object ops;
+
+ if (!_ses.Properties.TryGetValue("ops", out ops))
+ {
+ ops = new List<Operation>();
+
+ _ses.Properties["ops"] = ops;
+ }
+
+ return (ICollection<Operation>) ops;
+ }
+ }
+
+ /// <summary>
+ /// Logged operation.
+ /// </summary>
+ internal class Operation
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="cacheName">Cache name.</param>
+ /// <param name="type">Operation type.</param>
+ public Operation(string cacheName, OperationType type)
+ {
+ CacheName = cacheName;
+ Type = type;
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="cacheName">Cache name.</param>
+ /// <param name="type">Operation type.</param>
+ /// <param name="key">Key.</param>
+ /// <param name="val">Value.</param>
+ public Operation(string cacheName, OperationType type, int key, int val) : this(cacheName, type)
+ {
+ Key = key;
+ Value = val;
+ }
+
+ /// <summary>
+ /// Cache name.
+ /// </summary>
+ public string CacheName { get; set; }
+
+ /// <summary>
+ /// Operation type.
+ /// </summary>
+ public OperationType Type { get; set; }
+
+ /// <summary>
+ /// Key.
+ /// </summary>
+ public int Key { get; set; }
+
+ /// <summary>
+ /// Value.
+ /// </summary>
+ public int Value { get; set; }
+
+ /// <summary>
+ /// Commit flag.
+ /// </summary>
+ public bool Commit { get; set; }
+ }
+
+ /// <summary>
+ /// Operation types.
+ /// </summary>
+ internal enum OperationType
+ {
+ /** Write. */
+ Write,
+
+ /** Delete. */
+ Delete,
+
+ /** Session end. */
+ SesEnd
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
new file mode 100644
index 0000000..4e5e050
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Impl;
+ using Apache.Ignite.Core.Portable;
+ using NUnit.Framework;
+
+ /// <summary>
+ ///
+ /// </summary>
+ class Key
+ {
+ private readonly int _idx;
+
+ public Key(int idx)
+ {
+ _idx = idx;
+ }
+
+ public int Index()
+ {
+ return _idx;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (obj == null || obj.GetType() != GetType())
+ return false;
+
+ Key key = (Key)obj;
+
+ return key._idx == _idx;
+ }
+
+ public override int GetHashCode()
+ {
+ return _idx;
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ class Value
+ {
+ private int _idx;
+
+ public Value(int idx)
+ {
+ _idx = idx;
+ }
+
+ public int Index()
+ {
+ return _idx;
+ }
+ }
+
+ /// <summary>
+ /// Cache entry predicate.
+ /// </summary>
+ [Serializable]
+ public class CacheEntryFilter : ICacheEntryFilter<int, string>
+ {
+ /** <inheritdoc /> */
+ public bool Invoke(ICacheEntry<int, string> entry)
+ {
+ return entry.Key >= 105;
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public class CacheStoreTest
+ {
+ /** */
+ private const string PortableStoreCacheName = "portable_store";
+
+ /** */
+ private const string ObjectStoreCacheName = "object_store";
+
+ /** */
+ private const string CustomStoreCacheName = "custom_store";
+
+ /** */
+ private const string TemplateStoreCacheName = "template_store*";
+
+ /// <summary>
+ ///
+ /// </summary>
+ [TestFixtureSetUp]
+ public void BeforeTests()
+ {
+ //TestUtils.JVM_DEBUG = true;
+
+ TestUtils.KillProcesses();
+
+ TestUtils.JvmDebug = true;
+
+ IgniteConfigurationEx cfg = new IgniteConfigurationEx();
+
+ cfg.GridName = GridName();
+ cfg.JvmClasspath = TestUtils.CreateTestClasspath();
+ cfg.JvmOptions = TestUtils.TestJavaOptions();
+ cfg.SpringConfigUrl = "config\\native-client-test-cache-store.xml";
+
+ PortableConfiguration portCfg = new PortableConfiguration();
+
+ portCfg.Types = new List<string> { typeof(Key).FullName, typeof(Value).FullName };
+
+ cfg.PortableConfiguration = portCfg;
+
+ Ignition.Start(cfg);
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ [TestFixtureTearDown]
+ public virtual void AfterTests()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ [SetUp]
+ public void BeforeTest()
+ {
+ Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ [TearDown]
+ public void AfterTest()
+ {
+ var cache = Cache();
+
+ cache.Clear();
+
+ Assert.IsTrue(cache.IsEmpty, "Cache is not empty: " + cache.Size());
+
+ CacheTestStore.Reset();
+
+ Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
+ }
+
+ [Test]
+ public void TestLoadCache()
+ {
+ var cache = Cache();
+
+ Assert.AreEqual(0, cache.Size());
+
+ cache.LoadCache(new CacheEntryFilter(), 100, 10);
+
+ Assert.AreEqual(5, cache.Size());
+
+ for (int i = 105; i < 110; i++)
+ Assert.AreEqual("val_" + i, cache.Get(i));
+ }
+
+ [Test]
+ public void TestLocalLoadCache()
+ {
+ var cache = Cache();
+
+ Assert.AreEqual(0, cache.Size());
+
+ cache.LocalLoadCache(new CacheEntryFilter(), 100, 10);
+
+ Assert.AreEqual(5, cache.Size());
+
+ for (int i = 105; i < 110; i++)
+ Assert.AreEqual("val_" + i, cache.Get(i));
+ }
+
+ [Test]
+ public void TestLoadCacheMetadata()
+ {
+ CacheTestStore.LoadObjects = true;
+
+ var cache = Cache();
+
+ Assert.AreEqual(0, cache.Size());
+
+ cache.LocalLoadCache(null, 0, 3);
+
+ Assert.AreEqual(3, cache.Size());
+
+ var meta = cache.WithKeepPortable<Key, IPortableObject>().Get(new Key(0)).Metadata();
+
+ Assert.NotNull(meta);
+
+ Assert.AreEqual("Value", meta.TypeName);
+ }
+
+ [Test]
+ public void TestLoadCacheAsync()
+ {
+ var cache = Cache().WithAsync();
+
+ Assert.AreEqual(0, cache.Size());
+
+ cache.LocalLoadCache(new CacheEntryFilter(), 100, 10);
+
+ var fut = cache.GetFuture<object>();
+
+ fut.Get();
+
+ Assert.IsTrue(fut.IsDone);
+
+ cache.Size();
+ Assert.AreEqual(5, cache.GetFuture<int>().ToTask().Result);
+
+ for (int i = 105; i < 110; i++)
+ {
+ cache.Get(i);
+
+ Assert.AreEqual("val_" + i, cache.GetFuture<string>().ToTask().Result);
+ }
+ }
+
+ [Test]
+ public void TestPutLoad()
+ {
+ var cache = Cache();
+
+ cache.Put(1, "val");
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(1, map.Count);
+
+ cache.LocalEvict(new[] { 1 });
+
+ Assert.AreEqual(0, cache.Size());
+
+ Assert.AreEqual("val", cache.Get(1));
+
+ Assert.AreEqual(1, cache.Size());
+ }
+
+ [Test]
+ public void TestPutLoadPortables()
+ {
+ var cache = PortableStoreCache<int, Value>();
+
+ cache.Put(1, new Value(1));
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(1, map.Count);
+
+ IPortableObject v = (IPortableObject)map[1];
+
+ Assert.AreEqual(1, v.Field<int>("_idx"));
+
+ cache.LocalEvict(new[] { 1 });
+
+ Assert.AreEqual(0, cache.Size());
+
+ Assert.AreEqual(1, cache.Get(1).Index());
+
+ Assert.AreEqual(1, cache.Size());
+ }
+
+ [Test]
+ public void TestPutLoadObjects()
+ {
+ var cache = ObjectStoreCache<int, Value>();
+
+ cache.Put(1, new Value(1));
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(1, map.Count);
+
+ Value v = (Value)map[1];
+
+ Assert.AreEqual(1, v.Index());
+
+ cache.LocalEvict(new[] { 1 });
+
+ Assert.AreEqual(0, cache.Size());
+
+ Assert.AreEqual(1, cache.Get(1).Index());
+
+ Assert.AreEqual(1, cache.Size());
+ }
+
+ [Test]
+ public void TestPutLoadAll()
+ {
+ var putMap = new Dictionary<int, string>();
+
+ for (int i = 0; i < 10; i++)
+ putMap.Add(i, "val_" + i);
+
+ var cache = Cache();
+
+ cache.PutAll(putMap);
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(10, map.Count);
+
+ for (int i = 0; i < 10; i++)
+ Assert.AreEqual("val_" + i, map[i]);
+
+ cache.Clear();
+
+ Assert.AreEqual(0, cache.Size());
+
+ ICollection<int> keys = new List<int>();
+
+ for (int i = 0; i < 10; i++)
+ keys.Add(i);
+
+ IDictionary<int, string> loaded = cache.GetAll(keys);
+
+ Assert.AreEqual(10, loaded.Count);
+
+ for (int i = 0; i < 10; i++)
+ Assert.AreEqual("val_" + i, loaded[i]);
+
+ Assert.AreEqual(10, cache.Size());
+ }
+
+ [Test]
+ public void TestRemove()
+ {
+ var cache = Cache();
+
+ for (int i = 0; i < 10; i++)
+ cache.Put(i, "val_" + i);
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(10, map.Count);
+
+ for (int i = 0; i < 5; i++)
+ cache.Remove(i);
+
+ Assert.AreEqual(5, map.Count);
+
+ for (int i = 5; i < 10; i++)
+ Assert.AreEqual("val_" + i, map[i]);
+ }
+
+ [Test]
+ public void TestRemoveAll()
+ {
+ var cache = Cache();
+
+ for (int i = 0; i < 10; i++)
+ cache.Put(i, "val_" + i);
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(10, map.Count);
+
+ cache.RemoveAll(new List<int> { 0, 1, 2, 3, 4 });
+
+ Assert.AreEqual(5, map.Count);
+
+ for (int i = 5; i < 10; i++)
+ Assert.AreEqual("val_" + i, map[i]);
+ }
+
+ [Test]
+ public void TestTx()
+ {
+ var cache = Cache();
+
+ using (var tx = cache.Ignite.Transactions.TxStart())
+ {
+ CacheTestStore.ExpCommit = true;
+
+ tx.AddMeta("meta", 100);
+
+ cache.Put(1, "val");
+
+ tx.Commit();
+ }
+
+ IDictionary map = StoreMap();
+
+ Assert.AreEqual(1, map.Count);
+
+ Assert.AreEqual("val", map[1]);
+ }
+
+ [Test]
+ public void TestLoadCacheMultithreaded()
+ {
+ CacheTestStore.LoadMultithreaded = true;
+
+ var cache = Cache();
+
+ Assert.AreEqual(0, cache.Size());
+
+ cache.LocalLoadCache(null, 0, null);
+
+ Assert.AreEqual(1000, cache.Size());
+
+ for (int i = 0; i < 1000; i++)
+ Assert.AreEqual("val_" + i, cache.Get(i));
+ }
+
+ [Test]
+ public void TestCustomStoreProperties()
+ {
+ var cache = CustomStoreCache();
+ Assert.IsNotNull(cache);
+
+ Assert.AreEqual(42, CacheTestStore.intProperty);
+ Assert.AreEqual("String value", CacheTestStore.stringProperty);
+ }
+
+ [Test]
+ public void TestDynamicStoreStart()
+ {
+ var cache = TemplateStoreCache();
+
+ Assert.IsNotNull(cache);
+
+ cache.Put(1, cache.Name);
+
+ Assert.AreEqual(cache.Name, CacheTestStore.Map[1]);
+ }
+
+ /// <summary>
+ /// Get's grid name for this test.
+ /// </summary>
+ /// <returns>Grid name.</returns>
+ protected virtual string GridName()
+ {
+ return null;
+ }
+
+ private IDictionary StoreMap()
+ {
+ return CacheTestStore.Map;
+ }
+
+ private ICache<int, string> Cache()
+ {
+ return PortableStoreCache<int, string>();
+ }
+
+ private ICache<TK, TV> PortableStoreCache<TK, TV>()
+ {
+ return Ignition.GetIgnite(GridName()).Cache<TK, TV>(PortableStoreCacheName);
+ }
+
+ private ICache<TK, TV> ObjectStoreCache<TK, TV>()
+ {
+ return Ignition.GetIgnite(GridName()).Cache<TK, TV>(ObjectStoreCacheName);
+ }
+
+ private ICache<int, string> CustomStoreCache()
+ {
+ return Ignition.GetIgnite(GridName()).Cache<int, string>(CustomStoreCacheName);
+ }
+
+ private ICache<int, string> TemplateStoreCache()
+ {
+ var cacheName = TemplateStoreCacheName.Replace("*", Guid.NewGuid().ToString());
+
+ return Ignition.GetIgnite(GridName()).GetOrCreateCache<int, string>(cacheName);
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public class NamedNodeCacheStoreTest : CacheStoreTest
+ {
+ /** <inheritDoc /> */
+ protected override string GridName()
+ {
+ return "name";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
new file mode 100644
index 0000000..770ca83
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using System.Collections;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Store;
+
+ /// <summary>
+ /// Test cache store with parallel load.
+ /// </summary>
+ public class CacheTestParallelLoadStore : CacheParallelLoadStoreAdapter
+ {
+ /** Length of input data sequence */
+ public const int InputDataLength = 10000;
+
+ /** list of thread ids where Parse has been executed */
+ private static readonly ConcurrentDictionary<int, int> ThreadIds = new ConcurrentDictionary<int, int>();
+
+ /// <summary>
+ /// Gets the count of unique threads that entered Parse method.
+ /// </summary>
+ public static int UniqueThreadCount
+ {
+ get { return ThreadIds.Count; }
+ }
+
+ /// <summary>
+ /// Resets the test counters.
+ /// </summary>
+ public static void ResetCounters()
+ {
+ ThreadIds.Clear();
+ }
+
+ /** <inheritdoc /> */
+ protected override IEnumerable GetInputData()
+ {
+ return Enumerable.Range(0, InputDataLength).Select(x => new Record {Id = x, Name = "Test Record " + x});
+ }
+
+ /** <inheritdoc /> */
+ protected override KeyValuePair<object, object>? Parse(object inputRecord, params object[] args)
+ {
+ var threadId = Thread.CurrentThread.ManagedThreadId;
+ ThreadIds.GetOrAdd(threadId, threadId);
+
+ var minId = (int)args[0];
+
+ var rec = (Record)inputRecord;
+
+ return rec.Id >= minId
+ ? new KeyValuePair<object, object>(rec.Id, rec)
+ : (KeyValuePair<object, object>?) null;
+ }
+
+ /// <summary>
+ /// Test store record.
+ /// </summary>
+ public class Record
+ {
+ /// <summary>
+ /// Gets or sets the identifier.
+ /// </summary>
+ public int Id { get; set; }
+
+ /// <summary>
+ /// Gets or sets the name.
+ /// </summary>
+ public string Name { get; set; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
new file mode 100644
index 0000000..9c381cb
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Concurrent;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Resource;
+
+ [SuppressMessage("ReSharper", "FieldCanBeMadeReadOnly.Local")]
+ public class CacheTestStore : ICacheStore
+ {
+ public static readonly IDictionary Map = new ConcurrentDictionary<object, object>();
+
+ public static bool ExpCommit;
+
+ public static bool LoadMultithreaded;
+
+ public static bool LoadObjects;
+
+ [InstanceResource]
+ private IIgnite _grid = null;
+
+ [StoreSessionResource]
+#pragma warning disable 649
+ private ICacheStoreSession _ses;
+#pragma warning restore 649
+
+ public static int intProperty;
+
+ public static string stringProperty;
+
+ public static void Reset()
+ {
+ Map.Clear();
+
+ ExpCommit = false;
+ LoadMultithreaded = false;
+ LoadObjects = false;
+ }
+
+ public void LoadCache(Action<object, object> act, params object[] args)
+ {
+ Debug.Assert(_grid != null);
+
+ if (LoadMultithreaded)
+ {
+ int cnt = 0;
+
+ TestUtils.RunMultiThreaded(() => {
+ int i;
+
+ while ((i = Interlocked.Increment(ref cnt) - 1) < 1000)
+ act(i, "val_" + i);
+ }, 8);
+ }
+ else
+ {
+ int start = (int)args[0];
+ int cnt = (int)args[1];
+
+ for (int i = start; i < start + cnt; i++)
+ {
+ if (LoadObjects)
+ act(new Key(i), new Value(i));
+ else
+ act(i, "val_" + i);
+ }
+ }
+ }
+
+ public object Load(object key)
+ {
+ Debug.Assert(_grid != null);
+
+ return Map[key];
+ }
+
+ public IDictionary LoadAll(ICollection keys)
+ {
+ Debug.Assert(_grid != null);
+
+ return keys.OfType<object>().ToDictionary(key => key, Load);
+ }
+
+ public void Write(object key, object val)
+ {
+ Debug.Assert(_grid != null);
+
+ Map[key] = val;
+ }
+
+ public void WriteAll(IDictionary map)
+ {
+ Debug.Assert(_grid != null);
+
+ foreach (DictionaryEntry e in map)
+ Map[e.Key] = e.Value;
+ }
+
+ public void Delete(object key)
+ {
+ Debug.Assert(_grid != null);
+
+ Map.Remove(key);
+ }
+
+ public void DeleteAll(ICollection keys)
+ {
+ Debug.Assert(_grid != null);
+
+ foreach (object key in keys)
+ Map.Remove(key);
+ }
+
+ public void SessionEnd(bool commit)
+ {
+ Debug.Assert(_grid != null);
+
+ Debug.Assert(_ses != null);
+ }
+
+ public int IntProperty
+ {
+ get { return intProperty; }
+ set { intProperty = value; }
+ }
+
+ public string StringProperty
+ {
+ get { return stringProperty; }
+ set { stringProperty = value; }
+ }
+ }
+}