You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2017/08/11 23:52:56 UTC
[40/52] [partial] geode-native git commit: GEODE-3165: Reogranized
sources relative to the root for better CMake IDE integration.
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDeltaTestN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientDeltaTestN.cs b/clicache/integration-test/ThinClientDeltaTestN.cs
new file mode 100644
index 0000000..a7ee6b3
--- /dev/null
+++ b/clicache/integration-test/ThinClientDeltaTestN.cs
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading;
+
+#pragma warning disable 618
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client.Tests;
+ using Apache.Geode.Client;
+ using DeltaEx = Apache.Geode.Client.Tests.DeltaEx;
+
+ public class CqDeltaListener<TKey, TResult> : ICqListener<TKey, TResult>
+ {
+
+ public CqDeltaListener()
+ {
+ m_deltaCount = 0;
+ m_valueCount = 0;
+ }
+
+ public void OnEvent(CqEvent<TKey, TResult> aCqEvent)
+ {
+ byte[] deltaValue = aCqEvent.getDeltaValue();
+ DeltaTestImpl newValue = new DeltaTestImpl();
+ DataInput input = CacheHelper.DCache.CreateDataInput(deltaValue);
+ newValue.FromDelta(input);
+ if (newValue.GetIntVar() == 5)
+ {
+ m_deltaCount++;
+ }
+ DeltaTestImpl fullObject = (DeltaTestImpl)(object)aCqEvent.getNewValue();
+ if (fullObject.GetIntVar() == 5)
+ {
+ m_valueCount++;
+ }
+
+ }
+
+ public void OnError(CqEvent<TKey, TResult> aCqEvent)
+ {
+ }
+
+ public void Close()
+ {
+ }
+
+ public int GetDeltaCount()
+ {
+ return m_deltaCount;
+ }
+
+ public int GetValueCount()
+ {
+ return m_valueCount;
+ }
+
+ private int m_deltaCount;
+ private int m_valueCount;
+ }
+
+ public class DeltaTestAD : IGeodeDelta, IGeodeSerializable
+ {
+ private int _deltaUpdate;
+ private string _staticData;
+
+ public static DeltaTestAD Create()
+ {
+ return new DeltaTestAD();
+ }
+
+ public DeltaTestAD()
+ {
+ _deltaUpdate = 1;
+ _staticData = "Data which don't get updated";
+ }
+
+
+ #region IGeodeDelta Members
+
+ public void FromDelta(DataInput input)
+ {
+ _deltaUpdate = input.ReadInt32();
+ }
+
+ public bool HasDelta()
+ {
+ _deltaUpdate++;
+ bool isDelta = (_deltaUpdate % 2) == 1;
+ Util.Log("In DeltaTestAD.HasDelta _deltaUpdate:" + _deltaUpdate + " : isDelta:" + isDelta);
+ return isDelta;
+ }
+
+ public void ToDelta(DataOutput output)
+ {
+ output.WriteInt32(_deltaUpdate);
+ }
+
+ #endregion
+
+ #region IGeodeSerializable Members
+
+ public uint ClassId
+ {
+ get { return 151; }
+ }
+
+ public IGeodeSerializable FromData(DataInput input)
+ {
+ _deltaUpdate = input.ReadInt32();
+ _staticData = input.ReadUTF();
+
+ return this;
+ }
+
+ public uint ObjectSize
+ {
+ get { return (uint)(4 + _staticData.Length); }
+ }
+
+ public void ToData(DataOutput output)
+ {
+ output.WriteInt32(_deltaUpdate);
+ output.WriteUTF(_staticData);
+ }
+
+ public int DeltaUpdate
+ {
+ get { return _deltaUpdate; }
+ set { _deltaUpdate = value; }
+ }
+
+ #endregion
+ }
+
+ [TestFixture]
+ [Category("group1")]
+ [Category("unicast_only")]
+ [Category("generics")]
+ public class ThinClientDeltaTest : ThinClientRegionSteps
+ {
+ #region Private members
+
+ private UnitProcess m_client1, m_client2;
+ private CqDeltaListener<object, DeltaTestImpl> myCqListener;
+
+ #endregion
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2 };
+ }
+
+ [TestFixtureTearDown]
+ public override void EndTests()
+ {
+ CacheHelper.StopJavaServers();
+ base.EndTests();
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ try
+ {
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+ finally
+ {
+ CacheHelper.StopJavaServers();
+ CacheHelper.StopJavaLocators();
+ }
+ base.EndTest();
+ }
+
+ public void createLRURegionAndAttachPool(string regionName, string poolName)
+ {
+ CacheHelper.CreateLRUTCRegion_Pool<object, object>(regionName, true, true, null, null, poolName, false, 3);
+ }
+
+ public void createRegionAndAttachPool(string regionName, string poolName)
+ {
+ createRegionAndAttachPool(regionName, poolName, false);
+ }
+
+ public void createRegionAndAttachPool(string regionName, string poolName, bool cloningEnabled)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(regionName, true, true, null, null, poolName, false,
+ false, cloningEnabled);
+ }
+
+ //public void createPooledRegion(string regionName, string poolName, string endpoints, string locators)
+ //{
+ // CacheHelper.CreateTCRegion_Pool(regionName, true, true, null, endpoints, locators, poolName, false);
+ //}
+
+ public void createPool(string name, string locators, string serverGroup,
+ int redundancy, bool subscription)
+ {
+ CacheHelper.CreatePool<object, object>(name, locators, serverGroup, redundancy, subscription);
+ }
+
+ public void createExpirationRegion(string name, string poolName)
+ {
+ IRegion<object, object> region = CacheHelper.CreateExpirationRegion<object, object>(name,
+ poolName, ExpirationAction.LocalInvalidate, 5);
+ }
+
+ public void createExpirationRegion(string name)
+ {
+ createExpirationRegion(name, null);
+ }
+
+ public void CreateRegion(string name)
+ {
+ CreateRegion(name, false);
+ }
+
+ public void CreateRegion(string name, bool enableNotification)
+ {
+ CreateRegion(name, enableNotification, false);
+ }
+ public void CreateRegion(string name, bool enableNotification, bool cloningEnabled)
+ {
+ Apache.Geode.Client.RegionAttributes<object, object> attrs;
+ AttributesFactory<object, object> attrFac = new AttributesFactory<object, object>();
+ attrFac.SetCacheListener(new SimpleCacheListener<object, object>());
+ attrFac.SetCloningEnabled(cloningEnabled);
+ attrs = attrFac.CreateRegionAttributes();
+ CacheHelper.CreateRegion<object, object>(name, attrs);
+ }
+
+ //public void CreateOverflowRegion(string name, uint entriesLimit)
+ //{
+ // AttributesFactory af = new AttributesFactory();
+ // af.SetScope(ScopeType.DistributedAck);
+ // af.SetCachingEnabled(true);
+ // af.SetClientNotificationEnabled(true);
+ // af.SetLruEntriesLimit(entriesLimit);// LRU Entry limit set to 3
+
+ // af.SetDiskPolicy(DiskPolicyType.Overflows);
+ // Properties bdbProperties = Properties.Create();
+ // bdbProperties.Insert("CacheSizeGb", "0");
+ // bdbProperties.Insert("CacheSizeMb", "512");
+ // bdbProperties.Insert("PageSize", "65536");
+ // bdbProperties.Insert("MaxFileSize", "512000000");
+ // String wdPath = Directory.GetCurrentDirectory();
+ // String absPersistenceDir = wdPath + "/absBDB";
+ // String absEnvDir = wdPath + "/absBDBEnv";
+ // bdbProperties.Insert("PersistenceDirectory", absPersistenceDir);
+ // bdbProperties.Insert("EnvironmentDirectory", absEnvDir);
+ // af.SetPersistenceManager("BDBImpl", "createBDBInstance", bdbProperties);
+
+ // CacheHelper.CreateRegion(name, af.CreateRegionAttributes());
+ //}
+
+ void DoPutWithDelta()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothng
+ }
+ string cKey = m_keys[0];
+ DeltaEx val = new DeltaEx();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+
+ reg[cKey] = (object)val;
+ val.SetDelta(true);
+ reg[cKey] = (object)val;
+
+ DeltaEx val1 = new DeltaEx(0); // In this case JAVA side will throw invalid DeltaException
+ reg[cKey] = (object)val1;
+ val1.SetDelta(true);
+ reg[cKey] = (object)val1;
+ if (DeltaEx.ToDeltaCount != 2)
+ {
+ Util.Log("DeltaEx.ToDataCount = " + DeltaEx.ToDataCount);
+ Assert.Fail(" Delta count should have been 2, is " + DeltaEx.ToDeltaCount);
+ }
+ if (DeltaEx.ToDataCount != 3)
+ Assert.Fail("Data count should have been 3, is " + DeltaEx.ToDataCount);
+ DeltaEx.ToDeltaCount = 0;
+ DeltaEx.ToDataCount = 0;
+ DeltaEx.FromDataCount = 0;
+ DeltaEx.FromDeltaCount = 0;
+ }
+
+ void Do_Put_Contains_Remove_WithDelta()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothng
+ }
+ string cKey = m_keys[0];
+ DeltaEx val = new DeltaEx();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+
+ reg[cKey] = (object)val;
+ val.SetDelta(true);
+ reg[cKey] = (object)val;
+
+ DeltaEx val1 = new DeltaEx(0); // In this case JAVA side will throw invalid DeltaException
+ reg[cKey] = (object)val1;
+ val1.SetDelta(true);
+ reg[cKey] = (object)val1;
+ if (DeltaEx.ToDeltaCount != 2)
+ {
+ Util.Log("DeltaEx.ToDataCount = " + DeltaEx.ToDataCount);
+ Assert.Fail(" Delta count should have been 2, is " + DeltaEx.ToDeltaCount);
+ }
+ if (DeltaEx.ToDataCount != 3)
+ Assert.Fail("Data count should have been 3, is " + DeltaEx.ToDataCount);
+ DeltaEx.ToDeltaCount = 0;
+ DeltaEx.ToDataCount = 0;
+ DeltaEx.FromDataCount = 0;
+ DeltaEx.FromDeltaCount = 0;
+
+ // Try Contains with key & value that are present. Result should be true.
+ KeyValuePair<object, object> myentry = new KeyValuePair<object, object>(cKey, val1);
+ bool containsOpflag = reg.Contains(myentry);
+ Assert.IsTrue(containsOpflag, "Result should be true as key & value are present");
+
+ // Try Remove with key & value that are present. Result should be true.
+ bool removeOpflag = reg.Remove(cKey);
+ Assert.IsTrue(removeOpflag, "Result should be true as key & value are present");
+
+ //Check Contains with removed entry. Result should be false.
+ bool updatedcontainsOpflag = reg.Contains(myentry);
+ Assert.IsFalse(updatedcontainsOpflag, "Result should be false as key & value are removed");
+ }
+
+ void DoNotificationWithDelta()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothig.
+ }
+
+ string cKey = m_keys[0];
+ DeltaEx val = new DeltaEx();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ reg[cKey] = val;
+ val.SetDelta(true);
+ reg[cKey] = val;
+
+ string cKey1 = m_keys[1];
+ DeltaEx val1 = new DeltaEx();
+ reg[cKey1] = val1;
+ val1.SetDelta(true);
+ reg[cKey1] = val1;
+ DeltaEx.ToDeltaCount = 0;
+ DeltaEx.ToDataCount = 0;
+ }
+
+ void DoNotificationWithDefaultCloning()
+ {
+ string cKey = m_keys[0];
+ DeltaTestImpl val = new DeltaTestImpl();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ reg[cKey] = val;
+ val.SetIntVar(2);
+ val.SetDelta(true);
+ reg[cKey] = val;
+
+ javaobject.PdxDelta pd = new javaobject.PdxDelta(1001);
+ for (int i = 0; i < 10; i++)
+ {
+ reg["pdxdelta"] = pd;
+ }
+ }
+
+ void DoNotificationWithDeltaLRU()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothig.
+ }
+
+ string cKey1 = "key1";
+ string cKey2 = "key2";
+ string cKey3 = "key3";
+ string cKey4 = "key4";
+ string cKey5 = "key5";
+ string cKey6 = "key6";
+ DeltaEx val1 = new DeltaEx();
+ DeltaEx val2 = new DeltaEx();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ reg[cKey1] = val1;
+ reg[cKey2] = val1;
+ reg[cKey3] = val1;
+ reg[cKey4] = val1;
+ reg[cKey5] = val1;
+ reg[cKey6] = val1;
+ val2.SetDelta(true);
+ reg[cKey1] = val2;
+
+ DeltaEx.ToDeltaCount = 0;
+ DeltaEx.ToDataCount = 0;
+ }
+
+ void DoExpirationWithDelta()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothig.
+ }
+
+ DeltaEx val1 = new DeltaEx();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ reg[1] = val1;
+ // Sleep 10 seconds to allow expiration of entry in client 2
+ Thread.Sleep(10000);
+ val1.SetDelta(true);
+ reg[1] = val1;
+ DeltaEx.ToDeltaCount = 0;
+ DeltaEx.ToDataCount = 0;
+ }
+
+ void DoCqWithDelta()
+ {
+ string cKey1 = "key1";
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ DeltaTestImpl value = new DeltaTestImpl();
+ reg[cKey1] = value;
+ value.SetIntVar(5);
+ value.SetDelta(true);
+ reg[cKey1] = value;
+ }
+
+ void initializeDeltaClientAD()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaTestAD.Create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothng
+ }
+ }
+
+ void DoDeltaAD_C1_1()
+ {
+ DeltaTestAD val = new DeltaTestAD();
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ reg.GetSubscriptionService().RegisterAllKeys();
+ Util.Log("clientAD1 put");
+ reg[1] = val;
+ Util.Log("clientAD1 put done");
+ }
+
+ void DoDeltaAD_C2_1()
+ {
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+
+ Util.Log("clientAD2 get");
+ DeltaTestAD val = (DeltaTestAD)reg[1];
+
+ Assert.AreEqual(2, val.DeltaUpdate);
+ Util.Log("clientAD2 get done");
+ reg[1] = val;
+ Util.Log("clientAD2 put done");
+
+ javaobject.PdxDelta pd = new javaobject.PdxDelta(1001);
+ for (int i = 0; i < 10; i++)
+ {
+ reg["pdxdelta"] = pd;
+ }
+ }
+
+ void DoDeltaAD_C1_afterC2Put()
+ {
+ Thread.Sleep(15000);
+ DeltaTestAD val = null;
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ Util.Log("client fetching entry from local cache");
+ val = (DeltaTestAD)reg.GetEntry(1).Value;
+ Assert.IsNotNull(val);
+ Assert.AreEqual(3, val.DeltaUpdate);
+ Util.Log("done");
+
+ System.Threading.Thread.Sleep(5000);
+ //Assert.Greater(javaobject.PdxDelta.GotDelta, 7, "this should have recieve delta");
+ javaobject.PdxDelta pd = (javaobject.PdxDelta)(reg.GetLocalView()["pdxdelta"]);
+ Assert.Greater(pd.Delta, 7, "this should have recieve delta");
+ }
+
+ void runDeltaWithAppdomian(bool cloningenable)
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_deltaAD.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+ string regionName = "DistRegionAck";
+ // if (usePools)
+ {
+ //CacheHelper.CreateTCRegion_Pool_AD("DistRegionAck", false, false, null, null, CacheHelper.Locators, "__TEST_POOL1__", false, false, false);
+ m_client1.Call(CacheHelper.CreateTCRegion_Pool_AD1, regionName, false, true, CacheHelper.Locators, (string)"__TEST_POOL1__", true, cloningenable);
+ m_client2.Call(CacheHelper.CreateTCRegion_Pool_AD1, regionName, false, true, CacheHelper.Locators, (string)"__TEST_POOL1__", false, cloningenable);
+
+ // m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false);
+ // m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+ }
+
+
+ m_client1.Call(initializeDeltaClientAD);
+ m_client2.Call(initializeDeltaClientAD);
+
+ m_client1.Call(DoDeltaAD_C1_1);
+ m_client2.Call(DoDeltaAD_C2_1);
+ m_client1.Call(DoDeltaAD_C1_afterC2Put);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runPutWithDelta()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client1.Call(DoPutWithDelta);
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runPut_Contains_Remove_WithDelta()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client1.Call(Do_Put_Contains_Remove_WithDelta);
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void registerClassCl2()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ //do nothing
+ }
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+
+ reg.GetSubscriptionService().RegisterRegex(".*");
+ AttributesMutator<object, object> attrMutator = reg.AttributesMutator;
+ attrMutator.SetCacheListener(new SimpleCacheListener<object, object>());
+ }
+
+ void registerClassDeltaTestImpl()
+ {
+ try
+ {
+ Serializable.RegisterTypeGeneric(DeltaTestImpl.CreateDeserializable, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ // ARB: ignore exception caused by type reregistration.
+ }
+ DeltaTestImpl.ResetDataCount();
+
+ Thread.Sleep(2000);
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ try
+ {
+ reg.GetSubscriptionService().RegisterRegex(".*");
+ }
+ catch (Exception)
+ {
+ // ARB: ignore regex exception for missing notification channel.
+ }
+ }
+
+ void registerCq()
+ {
+ Pool thePool = CacheHelper.DCache.GetPoolManager().Find("__TEST_POOL1__");
+ QueryService<object, DeltaTestImpl> cqService = null;
+ cqService = thePool.GetQueryService<object, DeltaTestImpl>();
+ CqAttributesFactory<object, DeltaTestImpl> attrFac = new CqAttributesFactory<object, DeltaTestImpl>();
+ myCqListener = new CqDeltaListener<object, DeltaTestImpl>();
+ attrFac.AddCqListener(myCqListener);
+ CqAttributes<object, DeltaTestImpl> cqAttr = attrFac.Create();
+ CqQuery<object, DeltaTestImpl> theQuery = cqService.NewCq("select * from /DistRegionAck d where d.intVar > 4", cqAttr, false);
+ theQuery.Execute();
+ }
+
+ void VerifyDeltaCount()
+ {
+ Thread.Sleep(1000);
+ Util.Log("Total Data count" + DeltaEx.FromDataCount);
+ Util.Log("Total Data count" + DeltaEx.FromDeltaCount);
+ if (DeltaEx.FromDataCount != 3)
+ Assert.Fail("Count of fromData called should be 3 ");
+ if (DeltaEx.FromDeltaCount != 2)
+ Assert.Fail("Count of fromDelta called should be 2 ");
+ if (SimpleCacheListener<object, object>.isSuccess == false)
+ Assert.Fail("Listener failure");
+ SimpleCacheListener<object, object>.isSuccess = false;
+ if (DeltaEx.CloneCount != 2)
+ Assert.Fail("Clone count should be 2, is " + DeltaEx.CloneCount);
+
+ DeltaEx.FromDataCount = 0;
+ DeltaEx.FromDeltaCount = 0;
+ DeltaEx.CloneCount = 0;
+ }
+
+ void VerifyCloning()
+ {
+ Thread.Sleep(1000);
+ string cKey = m_keys[0];
+ IRegion<object, object> reg = CacheHelper.GetRegion<object, object>("DistRegionAck");
+ DeltaTestImpl val = reg[cKey] as DeltaTestImpl;
+
+ if (val.GetIntVar() != 2)
+ Assert.Fail("Int value after cloning should be 2, is " + val.GetIntVar());
+ if (DeltaTestImpl.GetFromDataCount() != 2)
+ Assert.Fail("After cloning, fromDataCount should have been 2, is " + DeltaTestImpl.GetFromDataCount());
+ if (DeltaTestImpl.GetToDataCount() != 1)
+ Assert.Fail("After cloning, toDataCount should have been 1, is " + DeltaTestImpl.GetToDataCount());
+
+ System.Threading.Thread.Sleep(5000);
+ //Assert.Greater(javaobject.PdxDelta.GotDelta, 7, "this should have recieve delta");
+ javaobject.PdxDelta pd = (javaobject.PdxDelta)(reg.GetLocalView()["pdxdelta"]);
+ Assert.Greater(pd.Delta, 7, "this should have recieve delta");
+ }
+
+ void VerifyDeltaCountLRU()
+ {
+ Thread.Sleep(1000);
+ if (DeltaEx.FromDataCount != 8)
+ {
+ Util.Log("DeltaEx.FromDataCount = " + DeltaEx.FromDataCount);
+ Util.Log("DeltaEx.FromDeltaCount = " + DeltaEx.FromDeltaCount);
+ Assert.Fail("Count should have been 8. 6 for common put and two when pulled from database and deserialized");
+ }
+ if (DeltaEx.FromDeltaCount != 1)
+ {
+ Util.Log("DeltaEx.FromDeltaCount = " + DeltaEx.FromDeltaCount);
+ Assert.Fail("Count should have been 1");
+ }
+ DeltaEx.FromDataCount = 0;
+ DeltaEx.FromDeltaCount = 0;
+ }
+
+ void VerifyCqDeltaCount()
+ {
+ // Wait for Cq event processing in listener
+ Thread.Sleep(1000);
+ if (myCqListener.GetDeltaCount() != 1)
+ {
+ Assert.Fail("Delta from CQ event does not have expected value");
+ }
+ if (myCqListener.GetValueCount() != 1)
+ {
+ Assert.Fail("Value from CQ event is incorrect");
+ }
+ }
+ void VerifyExpirationDeltaCount()
+ {
+ Thread.Sleep(1000);
+ if (DeltaEx.FromDataCount != 2)
+ Assert.Fail("Count should have been 2.");
+ if (DeltaEx.FromDeltaCount != 0)
+ Assert.Fail("Count should have been 0.");
+ DeltaEx.FromDataCount = 0;
+ DeltaEx.FromDeltaCount = 0;
+ }
+
+ void runNotificationWithDelta()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true);
+
+ m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true);
+
+ m_client2.Call(registerClassCl2);
+
+ m_client1.Call(DoNotificationWithDelta);
+ m_client2.Call(VerifyDeltaCount);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runNotificationWithDefaultCloning()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta_test_impl.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true);
+
+ m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true);
+
+ m_client1.Call(registerClassDeltaTestImpl);
+ m_client2.Call(registerClassDeltaTestImpl);
+
+ m_client1.Call(DoNotificationWithDefaultCloning);
+ m_client2.Call(VerifyCloning);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runNotificationWithDeltaWithOverFlow()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client1.Call(createLRURegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client2.Call(createLRURegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client2.Call(registerClassCl2);
+
+ m_client1.Call(DoNotificationWithDeltaLRU);
+ m_client2.Call(VerifyDeltaCountLRU);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runCqWithDelta()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta_test_impl.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client1.Call(registerClassDeltaTestImpl);
+ m_client2.Call(registerClassDeltaTestImpl);
+ m_client2.Call(registerCq);
+
+ m_client1.Call(DoCqWithDelta);
+ m_client2.Call(VerifyCqDeltaCount);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void runExpirationWithDelta()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC1");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1);
+
+ m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true);
+ m_client2.Call(createExpirationRegion, "DistRegionAck", "__TEST_POOL1__");
+
+ m_client2.Call(registerClassCl2);
+
+ m_client1.Call(DoExpirationWithDelta);
+ m_client2.Call(VerifyExpirationDeltaCount);
+ m_client1.Call(Close);
+ m_client2.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ //#region Tests
+
+ [Test]
+ public void PutWithDeltaAD()
+ {
+ runDeltaWithAppdomian(false);
+ runDeltaWithAppdomian(true);//cloning enable
+ }
+
+ [Test]
+ public void PutWithDelta()
+ {
+ runPutWithDelta();
+ }
+
+ [Test]
+ public void Put_Contains_Remove_WithDelta()
+ {
+ runPut_Contains_Remove_WithDelta();
+ }
+
+ [Test]
+ public void NotificationWithDelta()
+ {
+ runNotificationWithDelta();
+ }
+
+ [Test]
+ public void NotificationWithDefaultCloning()
+ {
+ runNotificationWithDefaultCloning();
+ }
+
+ [Test]
+ public void NotificationWithDeltaWithOverFlow()
+ {
+ runNotificationWithDeltaWithOverFlow();
+ }
+
+ [Test]
+ public void CqWithDelta()
+ {
+ runCqWithDelta();
+ }
+
+ [Test]
+ public void ExpirationWithDelta()
+ {
+ runExpirationWithDelta();
+ }
+
+ //#endregion
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDurableCqTestsN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientDurableCqTestsN.cs b/clicache/integration-test/ThinClientDurableCqTestsN.cs
new file mode 100644
index 0000000..b6b0a01
--- /dev/null
+++ b/clicache/integration-test/ThinClientDurableCqTestsN.cs
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using NUnit.Framework;
+using Apache.Geode.DUnitFramework;
+using Apache.Geode.Client.Tests;
+using Apache.Geode.Client;
+using System;
+
+namespace Apache.Geode.Client.UnitTests
+{
+
+ [TestFixture]
+ [Category("group3")]
+ [Category("unicast_only")]
+ [Category("generics")]
+ public class ThinClientDurableCqTests : ThinClientRegionSteps
+ {
+ #region Private Members
+ private UnitProcess m_client1 = null;
+ private UnitProcess m_client2 = null;
+ private string[] m_client1DurableCqNames = { "client1DurableCQ1", "client1DurableCQ2", "client1DurableCQ3", "client1DurableCQ4", "client1DurableCQ5", "client1DurableCQ6", "client1DurableCQ7", "client1DurableCQ8" };
+ private string[] m_client2DurableCqNames = { "client2DurableCQ1", "client2DurableCQ2", "client2DurableCQ3", "client2DurableCQ4", "client2DurableCQ5", "client2DurableCQ6", "client2DurableCQ7", "client2DurableCQ8" };
+ private static string[] QueryRegionNames = { "ListDurableCqs" };
+ private static int m_NumberOfCqs = 110;
+ #endregion
+
+ #region Test helper methods
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2 };
+ }
+
+ public void InitDurableClient(string locators, int redundancyLevel,
+ string durableClientId, int durableTimeout)
+ {
+ CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, durableClientId, durableTimeout);
+ CacheHelper.CreateTCRegion_Pool(QueryRegionNames[0], true, true, (ICacheListener<object, object>)null, CacheHelper.Locators, "__TESTPOOL1_", true);
+ }
+
+
+ public void RegisterCqsClient1(bool isRecycle)
+ {
+ Util.Log("Registering Cqs for client1.");
+ CqAttributesFactory<object, object> cqAf = new CqAttributesFactory<object, object>();
+ CqAttributes<object, object> attributes = cqAf.Create();
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+
+ if (!isRecycle)
+ {
+ qs.NewCq(m_client1DurableCqNames[0], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[1], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[2], "Select * From /" + QueryRegionNames[0], attributes, false).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[3], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, false).ExecuteWithInitialResults();
+ }
+ else
+ {
+ qs.NewCq(m_client1DurableCqNames[4], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[5], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[6], "Select * From /" + QueryRegionNames[0], attributes, false).ExecuteWithInitialResults();
+ qs.NewCq(m_client1DurableCqNames[7], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, false).ExecuteWithInitialResults();
+ }
+
+ }
+
+ public void RegisterCqsClient1MultipleChunks()
+ {
+ Util.Log("Registering Cqs for client1 for multiple chunks.");
+ CqAttributesFactory<object, object> cqAf = new CqAttributesFactory<object, object>();
+ CqAttributes<object, object> attributes = cqAf.Create();
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+
+ for (int i = 0; i < m_NumberOfCqs; i++)
+ qs.NewCq("MyCq_" + i.ToString(), "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults();
+
+ }
+
+ public void RegisterCqsClient2(bool isRecycle)
+ {
+ Util.Log("Registering Cqs for client2.");
+ CqAttributesFactory<object, object> cqAf = new CqAttributesFactory<object, object>();
+ CqAttributes<object, object> attributes = cqAf.Create();
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+
+ if (!isRecycle)
+ {
+ qs.NewCq(m_client2DurableCqNames[0], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[1], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[2], "Select * From /" + QueryRegionNames[0], attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[3], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, true).ExecuteWithInitialResults();
+ }
+ else
+ {
+ qs.NewCq(m_client2DurableCqNames[4], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[5], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[6], "Select * From /" + QueryRegionNames[0], attributes, true).ExecuteWithInitialResults();
+ qs.NewCq(m_client2DurableCqNames[7], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, true).ExecuteWithInitialResults();
+ }
+ }
+
+ public void VerifyDurableCqListClient1MultipleChunks()
+ {
+ Util.Log("Verifying durable Cqs for client1.");
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ System.Collections.Generic.List<string> durableCqList = qs.GetAllDurableCqsFromServer();
+ Assert.AreNotEqual(null, durableCqList);
+
+ Assert.AreEqual(m_NumberOfCqs, durableCqList.Count, "Durable CQ count sholuld be %d", m_NumberOfCqs);
+
+ Util.Log("Completed verifying durable Cqs for client1.");
+ }
+
+ public void VerifyDurableCqListClient1(bool isRecycle)
+ {
+ Util.Log("Verifying durable Cqs for client1.");
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ System.Collections.Generic.List<string> durableCqList = qs.GetAllDurableCqsFromServer();
+ Assert.AreNotEqual(null, durableCqList);
+
+ if (!isRecycle)
+ {
+ Assert.AreEqual(2, durableCqList.Count, "Durable CQ count sholuld be 2");
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[0]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[1]));
+ }
+ else
+ {
+ Assert.AreEqual(4, durableCqList.Count, "Durable CQ count sholuld be 4");
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[0]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[1]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[4]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[5]));
+ }
+ Util.Log("Completed verifying durable Cqs for client1.");
+ }
+
+ public void VerifyDurableCqListClient2(bool isRecycle)
+ {
+ Util.Log("Verifying durable Cqs for client2.");
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ System.Collections.Generic.List<string> durableCqList = qs.GetAllDurableCqsFromServer();
+ Assert.AreNotEqual(null, durableCqList);
+
+ if (!isRecycle)
+ {
+ Assert.AreEqual(4, durableCqList.Count, "Durable CQ count sholuld be 4");
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[0]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[1]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[2]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[3]));
+ }
+ else
+ {
+ Assert.AreEqual(8, durableCqList.Count, "Durable CQ count sholuld be 8");
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[0]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[1]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[2]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[3]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[4]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[5]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[6]));
+ Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[7]));
+ }
+ }
+
+ public void VerifyEmptyDurableCqListClient1()
+ {
+ Util.Log("Verifying empty durable Cqs for client1.");
+ QueryService<object, object> qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ System.Collections.Generic.List<string> durableCqList = qs.GetAllDurableCqsFromServer();
+ Assert.AreNotEqual(null, durableCqList);
+ Assert.AreEqual(0, durableCqList.Count, "Durable CQ list sholuld be empty");
+ }
+
+
+ private void RunTestGetDurableCqsFromServer()
+ {
+ try
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserverDurableCqs.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cache server 1 started");
+
+ m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300);
+ m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300);
+ Util.Log("client initialization done.");
+
+ m_client1.Call(RegisterCqsClient1, false);
+ m_client2.Call(RegisterCqsClient2, false);
+ Util.Log("Registered DurableCQs.");
+
+ m_client1.Call(VerifyDurableCqListClient1, false);
+ m_client2.Call(VerifyDurableCqListClient2, false);
+
+ Util.Log("Verified DurableCQ List.");
+ }
+ finally
+ {
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ }
+
+ }
+
+ private void RunTestGetDurableCqsFromServerCyclicClients()
+ {
+ try
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserverDurableCqs.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cache server 1 started");
+
+ m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300);
+ m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300);
+ Util.Log("client initialization done.");
+
+ m_client1.Call(RegisterCqsClient1, false);
+ m_client2.Call(RegisterCqsClient2, false);
+ Util.Log("Registered DurableCQs.");
+
+ m_client1.Call(VerifyDurableCqListClient1, false);
+ m_client1.Call(VerifyDurableCqListClient1, false);
+ Util.Log("Verified DurableCQ List.");
+
+
+ m_client1.Call(CacheHelper.CloseKeepAlive);
+ m_client2.Call(CacheHelper.CloseKeepAlive);
+
+
+ m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300);
+ m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300);
+ Util.Log("client re-initialization done.");
+
+ m_client1.Call(RegisterCqsClient1, true);
+ m_client2.Call(RegisterCqsClient2, true);
+ Util.Log("Registered DurableCQs.");
+
+ m_client1.Call(VerifyDurableCqListClient1, true);
+ m_client1.Call(VerifyDurableCqListClient1, true);
+
+ Util.Log("Verified DurableCQ List.");
+ }
+ finally
+ {
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ CacheHelper.StopJavaLocator(1);
+ }
+ }
+
+ [TestFixtureSetUp]
+ public override void InitTests()
+ {
+ base.InitTests();
+ }
+
+ [TestFixtureTearDown]
+ public override void EndTests()
+ {
+ m_client1.Exit();
+ m_client2.Exit();
+ base.EndTests();
+ }
+
+ [SetUp]
+ public override void InitTest()
+ {
+ base.InitTest();
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+ base.EndTest();
+ }
+
+
+ #endregion
+
+ #region Tests
+
+ [Test]
+ public void TestGetDurableCqsFromServerWithLocator()
+ {
+ RunTestGetDurableCqsFromServer();
+ }
+
+ [Test]
+ public void TestGetDurableCqsFromServerCyclicClientsWithLocator()
+ {
+ RunTestGetDurableCqsFromServerCyclicClients();
+ }
+
+ #endregion
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDurableTestsN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientDurableTestsN.cs b/clicache/integration-test/ThinClientDurableTestsN.cs
new file mode 100644
index 0000000..e75856e
--- /dev/null
+++ b/clicache/integration-test/ThinClientDurableTestsN.cs
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client;
+
+
+ using AssertionException = Apache.Geode.Client.AssertionException;
+ [TestFixture]
+ [Category("group2")]
+ [Category("unicast_only")]
+ [Category("generics")]
+ public class ThinClientDurableTests : ThinClientRegionSteps
+ {
+ #region Private members
+
+ private UnitProcess m_client1, m_client2, m_feeder;
+ private string[] m_regexes = { "D-Key-.*", "Key-.*" };
+ private string[] m_mixKeys = { "Key-1", "D-Key-1", "L-Key", "LD-Key" };
+ private string[] keys = { "Key-1", "Key-2", "Key-3", "Key-4", "Key-5" };
+
+ private static string DurableClientId1 = "DurableClientId1";
+ private static string DurableClientId2 = "DurableClientId2";
+
+ private static DurableListener<object, object> m_checker1, m_checker2;
+
+ #endregion
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ m_feeder = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2, m_feeder };
+ }
+
+ [TestFixtureTearDown]
+ public override void EndTests()
+ {
+ CacheHelper.StopJavaServers();
+ base.EndTests();
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ try
+ {
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+ m_feeder.Call(CacheHelper.Close);
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+ finally
+ {
+ CacheHelper.StopJavaServers();
+ }
+ base.EndTest();
+ }
+
+ #region Common Functions
+
+ public void InitFeeder(string locators, int redundancyLevel)
+ {
+ CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false);
+ CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, null,
+ locators, "__TESTPOOL1_", false);
+ }
+
+ public void InitFeeder2(string locators, int redundancyLevel)
+ {
+ CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false);
+ CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, null,
+ locators, "__TESTPOOL1_", false);
+
+ CacheHelper.CreatePool<object, object>("__TESTPOOL2_", locators, (string)null, redundancyLevel, false);
+ CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[1], false, true, null,
+ locators, "__TESTPOOL2_", false);
+ }
+
+ public void InitDurableClientWithTwoPools(string locators,
+ int redundancyLevel, string durableClientId, int durableTimeout, int expectedQ0, int expectedQ1)
+ {
+ DurableListener<object, object> checker = null;
+ CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel,
+ durableClientId, durableTimeout, 35000, "__TESTPOOL1_");
+ CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+
+ CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel,
+ durableClientId, durableTimeout, 35000, "__TESTPOOL2_");
+ CacheHelper.CreateTCRegion_Pool(RegionNames[1], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL2_", true);
+
+ IRegion<object, object> region0 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[1]);
+
+ try
+ {
+ region0.GetSubscriptionService().RegisterAllKeys(true);
+ region1.GetSubscriptionService().RegisterAllKeys(true);
+ }
+ catch (Exception other)
+ {
+ Assert.Fail("RegisterAllKeys threw unexpected exception: {0}", other.Message);
+ }
+
+ Pool pool0 = CacheHelper.DCache.GetPoolManager().Find(region0.Attributes.PoolName);
+ int pendingEventCount0 = pool0.PendingEventCount;
+ Util.Log("pendingEventCount0 for pool = {0} {1} ", pendingEventCount0, region0.Attributes.PoolName);
+ string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedQ0, pendingEventCount0);
+ Assert.AreEqual(expectedQ0, pendingEventCount0, msg);
+
+ Pool pool1 = CacheHelper.DCache.GetPoolManager().Find(region1.Attributes.PoolName);
+ int pendingEventCount1 = pool1.PendingEventCount;
+ Util.Log("pendingEventCount1 for pool = {0} {1} ", pendingEventCount1, region1.Attributes.PoolName);
+ string msg1 = string.Format("Expected Value ={0}, Actual = {1}", expectedQ1, pendingEventCount1);
+ Assert.AreEqual(expectedQ1, pendingEventCount1, msg1);
+
+ CacheHelper.DCache.ReadyForEvents();
+ Thread.Sleep(10000);
+
+ CacheHelper.DCache.Close(true);
+ }
+
+ public void ClearChecker(int client)
+ {
+ if (client == 1)
+ {
+ ThinClientDurableTests.m_checker1 = null;
+ }
+ else // client == 2
+ {
+ ThinClientDurableTests.m_checker2 = null;
+ }
+ }
+
+ public void InitDurableClient(int client, string locators, int redundancyLevel,
+ string durableClientId, int durableTimeout)
+ {
+ // Create DurableListener for first time and use same afterward.
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ if (ThinClientDurableTests.m_checker1 == null)
+ {
+ ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ if (ThinClientDurableTests.m_checker2 == null)
+ {
+ ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker2;
+ }
+ CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel,
+ durableClientId, durableTimeout);
+ CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+
+ CacheHelper.DCache.ReadyForEvents();
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+ region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true);
+ region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false);
+ //CacheableKey[] ldkeys = { new CacheableString(m_mixKeys[3]) };
+ ICollection<object> lkeys = new List<object>();
+ lkeys.Add((object)m_mixKeys[3]);
+ region1.GetSubscriptionService().RegisterKeys(lkeys, true, false);
+
+ ICollection<object> ldkeys = new List<object>(); ;
+ ldkeys.Add((object)m_mixKeys[2]);
+ region1.GetSubscriptionService().RegisterKeys(ldkeys, false, false);
+ }
+
+ public void InitClientXml(string cacheXml)
+ {
+ CacheHelper.InitConfig(cacheXml);
+ }
+
+ public void ReadyForEvents()
+ {
+ CacheHelper.DCache.ReadyForEvents();
+ }
+
+ public void PendingEventCount(IRegion<object, object> region, int expectedPendingQSize, bool exception)
+ {
+ Util.Log("PendingEventCount regionName = {0} ", region);
+ string poolName = region.Attributes.PoolName;
+ if (poolName != null)
+ {
+ Util.Log("PendingEventCount poolName = {0} ", poolName);
+ Pool pool = CacheHelper.DCache.GetPoolManager().Find(poolName);
+ if (exception)
+ {
+ try
+ {
+ int pendingEventCount = pool.PendingEventCount;
+ Util.Log("PendingEventCount Should have got exception ");
+ Assert.Fail("PendingEventCount Should have got exception");
+ }
+ catch (IllegalStateException ex)
+ {
+ Util.Log("Got expected exception for PendingEventCount {0} ", ex.Message);
+ }
+ }
+ else
+ {
+ int pendingEventCount = pool.PendingEventCount;
+ Util.Log("pendingEventCount = {0} ", pendingEventCount);
+ string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedPendingQSize, pendingEventCount);
+ Assert.AreEqual(expectedPendingQSize, pendingEventCount, msg);
+ }
+ }
+ }
+
+ public void FeederUpdate(int value, int sleep)
+ {
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+
+ region1[m_mixKeys[0]] = value;
+ Thread.Sleep(sleep);
+ region1[m_mixKeys[1]] = value;
+ Thread.Sleep(sleep);
+ region1[m_mixKeys[2]] = value;
+ Thread.Sleep(sleep);
+ region1[m_mixKeys[3]] = value;
+ Thread.Sleep(sleep);
+
+ region1.Remove(m_mixKeys[0]);
+ Thread.Sleep(sleep);
+ region1.Remove(m_mixKeys[1]);
+ Thread.Sleep(sleep);
+ region1.Remove(m_mixKeys[2]);
+ Thread.Sleep(sleep);
+ region1.Remove(m_mixKeys[3]);
+ Thread.Sleep(sleep);
+ }
+
+ public void FeederUpdate2(int pool1, int pool2)
+ {
+ IRegion<object, object> region0 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[1]);
+
+ for (int i = 0; i < pool1; i++)
+ {
+ region0[i] = i;
+ }
+
+ for (int i = 0; i < pool2; i++)
+ {
+ region1[i] = i;
+ }
+ }
+
+ public void ClientDown(bool keepalive)
+ {
+ if (keepalive)
+ {
+ CacheHelper.CloseKeepAlive();
+ }
+ else
+ {
+ CacheHelper.Close();
+ }
+ }
+
+ public void CrashClient()
+ {
+ // TODO: crash client here.
+ }
+
+ public void KillServer()
+ {
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ }
+
+ public delegate void KillServerDelegate();
+
+ #endregion
+
+
+ public void VerifyTotal(int client, int keys, int total)
+ {
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ checker = ThinClientDurableTests.m_checker2;
+ }
+
+ if (checker != null)
+ {
+ checker.validate(keys, total);
+ }
+ else
+ {
+ Assert.Fail("Checker is NULL!");
+ }
+ }
+
+ public void VerifyBasic(int client, int keyCount, int eventCount, int durableValue, int nonDurableValue)
+ {//1 4 8 1 1
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ checker = ThinClientDurableTests.m_checker2;
+ }
+
+ if (checker != null)
+ {
+ try
+ {
+ checker.validateBasic(keyCount, eventCount, durableValue, nonDurableValue);//4 8 1 1
+ }
+ catch (AssertionException e)
+ {
+ Util.Log("VERIFICATION FAILED for client {0}: {1} ", client, e);
+ throw e;
+ }
+ }
+ else
+ {
+ Assert.Fail("Checker is NULL!");
+ }
+ }
+
+ #region Basic Durable Test
+
+
+ void runDurableAndNonDurableBasic()
+ {
+ CacheHelper.SetupJavaServers(true,
+ "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+
+ for (int redundancy = 0; redundancy <= 1; redundancy++)
+ {
+ for (int closeType = 1; closeType <= 2; closeType++)
+ {
+ for (int downtime = 0; downtime <= 1; downtime++) // downtime updates
+ {
+ Util.Log("Starting loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime);
+
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ if (redundancy == 1)
+ {
+ CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1);
+ Util.Log("Cacheserver 2 started.");
+ }
+
+ m_feeder.Call(InitFeeder, CacheHelper.Locators, 0);
+ Util.Log("Feeder initialized.");
+
+ m_client1.Call(ClearChecker, 1);
+ m_client2.Call(ClearChecker, 2);
+
+ m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, 300);
+ m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, 3);
+
+ Util.Log("Clients initialized.");
+
+ m_feeder.Call(FeederUpdate, 1, 10);
+
+ Util.Log("Feeder performed first update.");
+ Thread.Sleep(45000); // wait for HA Q to drain and notify ack to go out.
+
+ switch (closeType)
+ {
+ case 1:
+
+ m_client1.Call(ClientDown, true);
+ m_client2.Call(ClientDown, true);
+
+ Util.Log("Clients downed with keepalive true.");
+ break;
+ case 2:
+
+ m_client1.Call(ClientDown, false);
+ m_client2.Call(ClientDown, false);
+
+ Util.Log("Clients downed with keepalive false.");
+ break;
+ case 3:
+
+ m_client1.Call(CrashClient);
+
+ m_client2.Call(CrashClient);
+
+ Util.Log("Clients downed as crash.");
+ break;
+ default:
+ break;
+ }
+
+ if (downtime == 1)
+ {
+ m_feeder.Call(FeederUpdate, 2, 10);
+
+ Util.Log("Feeder performed update during downtime.");
+ Thread.Sleep(20000); // wait for HA Q to drain and notify ack to go out.
+ }
+
+ m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, 300);
+
+ // Sleep for 45 seconds since durable timeout is 30 seconds so that client2 times out
+ Thread.Sleep(45000);
+
+ m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, 30);
+
+ Util.Log("Clients brought back up.");
+
+ if (closeType != 2 && downtime == 1)
+ {
+ m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1);
+
+ m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1);
+
+ }
+ else
+ {
+
+ m_client1.Call(VerifyBasic, 1, 4, 8, 1, 1);
+
+ m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1);
+
+ }
+
+ Util.Log("Verification completed.");
+
+ m_feeder.Call(ClientDown, false);
+
+ m_client1.Call(ClientDown, false);
+
+ m_client2.Call(ClientDown, false);
+
+ Util.Log("Feeder and Clients closed.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ if (redundancy == 1)
+ {
+ CacheHelper.StopJavaServer(2);
+ Util.Log("Cacheserver 2 stopped.");
+ }
+
+ Util.Log("Completed loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime);
+
+ } // end for int downtime
+ } // end for int closeType
+ } // end for int redundancy
+ CacheHelper.StopJavaLocator(1);
+ }
+
+ // Basic Durable Test to check durable event recieving for different combination
+ // of Close type ( Keep Alive = true / false ) , Intermediate update and rudundancy
+
+ [Test]
+ public void DurableAndNonDurableBasic()
+ {
+ runDurableAndNonDurableBasic();
+ } // end [Test] DurableAndNonDurableBasic
+
+ #endregion
+
+ #region Durable Intrest Test
+
+ public void InitDurableClientRemoveInterest(int client, string locators,
+ int redundancyLevel, string durableClientId, int durableTimeout)
+ {
+ // Client Registered Durable Intrest on two keys. We need to unregister them all here.
+
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ if (ThinClientDurableTests.m_checker1 == null)
+ {
+ ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ if (ThinClientDurableTests.m_checker2 == null)
+ {
+ ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker2;
+ }
+ CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel,
+ durableClientId, durableTimeout);
+ CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+
+ CacheHelper.DCache.ReadyForEvents();
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+
+ // Unregister Regex only durable
+ region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true);
+ region1.GetSubscriptionService().UnregisterRegex(m_regexes[0]);
+
+ // Unregister list only durable
+ string[] ldkeys = new string[] { m_mixKeys[3] };
+ region1.GetSubscriptionService().RegisterKeys(ldkeys, true, false);
+ region1.GetSubscriptionService().UnregisterKeys(ldkeys);
+ }
+
+ public void InitDurableClientNoInterest(int client, string locators,
+ int redundancyLevel, string durableClientId, int durableTimeout)
+ {
+ // we use "client" to either create a DurableListener or use the existing ones
+ // if the clients are initialized for the second time
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ if (ThinClientDurableTests.m_checker1 == null)
+ {
+ ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ if (ThinClientDurableTests.m_checker2 == null)
+ {
+ ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker2;
+ }
+ CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel,
+ durableClientId, durableTimeout);
+ CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+ CacheHelper.DCache.ReadyForEvents();
+ }
+
+ void runDurableInterest()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_feeder.Call(InitFeeder, CacheHelper.Locators, 0);
+ Util.Log("Feeder started.");
+
+ m_client1.Call(ClearChecker, 1);
+ m_client2.Call(ClearChecker, 2);
+ m_client1.Call(InitDurableClient, 1, CacheHelper.Locators,
+ 0, DurableClientId1, 60);
+ m_client2.Call(InitDurableClient, 2, CacheHelper.Locators,
+ 0, DurableClientId2, 60);
+ Util.Log("Clients started.");
+
+ m_feeder.Call(FeederUpdate, 1, 10);
+ Util.Log("Feeder performed first update.");
+
+ Thread.Sleep(15000);
+
+ m_client1.Call(ClientDown, true);
+ m_client2.Call(ClientDown, true);
+ Util.Log("Clients downed with keepalive true.");
+
+ m_client1.Call(InitDurableClientNoInterest, 1, CacheHelper.Locators,
+ 0, DurableClientId1, 60);
+ Util.Log("Client 1 started with no interest.");
+
+ m_client2.Call(InitDurableClientRemoveInterest, 2, CacheHelper.Locators,
+ 0, DurableClientId2, 60);
+ Util.Log("Client 2 started with remove interest.");
+
+ m_feeder.Call(FeederUpdate, 2, 10);
+ Util.Log("Feeder performed second update.");
+
+ Thread.Sleep(10000);
+
+ // only durable Intrest will remain.
+ m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1);
+
+ // no second update should be recieved.
+ m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1);
+ Util.Log("Verification completed.");
+
+ m_feeder.Call(ClientDown, false);
+ m_client1.Call(ClientDown, false);
+ m_client2.Call(ClientDown, false);
+ Util.Log("Feeder and Clients closed.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ //This is to test whether durable registered intrests remains on reconnect. and
+ // Unregister works on reconnect.
+
+ [Test]
+ public void DurableInterest()
+ {
+ runDurableInterest();
+ } // end [Test] DurableInterest
+ #endregion
+
+ #region Durable Failover Test
+
+
+ public void InitDurableClientForFailover(int client, string locators,
+ int redundancyLevel, string durableClientId, int durableTimeout)
+ {
+ // we use "client" to either create a DurableListener or use the existing ones
+ // if the clients are initialized for the second time
+ DurableListener<object, object> checker = null;
+ if (client == 1)
+ {
+ if (ThinClientDurableTests.m_checker1 == null)
+ {
+ ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker1;
+ }
+ else // client == 2
+ {
+ if (ThinClientDurableTests.m_checker2 == null)
+ {
+ ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create();
+ }
+ checker = ThinClientDurableTests.m_checker2;
+ }
+ CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel,
+ durableClientId, durableTimeout, 35000);
+ CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+ CacheHelper.DCache.ReadyForEvents();
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]);
+
+ try
+ {
+ region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true);
+ region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false);
+ }
+ catch (Exception other)
+ {
+ Assert.Fail("RegisterKeys threw unexpected exception: {0}", other.Message);
+ }
+ }
+
+ public void FeederUpdateForFailover(string region, int value, int sleep)
+ {
+ //update only 2 keys.
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(region);
+
+ region1[m_mixKeys[0]] = value;
+ Thread.Sleep(sleep);
+ region1[m_mixKeys[1]] = value;
+ Thread.Sleep(sleep);
+
+ }
+
+ void runDurableFailover()
+ {
+ CacheHelper.SetupJavaServers(true,
+ "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml");
+
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+
+ for (int clientDown = 0; clientDown <= 1; clientDown++)
+ {
+ for (int redundancy = 0; redundancy <= 1; redundancy++)
+ {
+ Util.Log("Starting loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy);
+
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_feeder.Call(InitFeeder, CacheHelper.Locators, 0);
+ Util.Log("Feeder started with redundancy level as 0.");
+
+ m_client1.Call(ClearChecker, 1);
+ m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators,
+ redundancy, DurableClientId1, 300);
+ Util.Log("Client started with redundancy level as {0}.", redundancy);
+
+ m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 1, 10);
+ Util.Log("Feeder updates 1 completed.");
+
+ CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1);
+ Util.Log("Cacheserver 2 started.");
+
+ //Time for redundancy thread to detect.
+ Thread.Sleep(35000);
+
+ if (clientDown == 1)
+ {
+ m_client1.Call(ClientDown, true);
+ }
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ //Time for failover
+ Thread.Sleep(5000);
+
+ m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 2, 10);
+ Util.Log("Feeder updates 2 completed.");
+
+ //Restart Client
+ if (clientDown == 1)
+ {
+ m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators,
+ redundancy, DurableClientId1, 300);
+ Util.Log("Client Restarted with redundancy level as {0}.", redundancy);
+ }
+
+ //Verify
+ if (clientDown == 1)
+ {
+ if (redundancy == 0) // Events missed
+ {
+ m_client1.Call(VerifyBasic, 1, 2, 2, 1, 1);
+ }
+ else // redundancy == 1 Only Durable Events should be recieved.
+ {
+ m_client1.Call(VerifyBasic, 1, 2, 3, 2, 1);
+ }
+ }
+ else // In normal failover all events should be recieved.
+ {
+ m_client1.Call(VerifyBasic, 1, 2, 4, 2, 2);
+ }
+
+ Util.Log("Verification completed.");
+
+ m_feeder.Call(ClientDown, false);
+ m_client1.Call(ClientDown, false);
+ Util.Log("Feeder and Client closed.");
+
+ CacheHelper.StopJavaServer(2);
+ Util.Log("Cacheserver 2 stopped.");
+
+ Util.Log("Completed loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy);
+ }// for redundancy
+ } // for clientDown
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void RunDurableClient(int expectedPendingQSize)
+ {
+ Properties<string, string> pp = Properties<string, string>.Create<string, string>();
+ pp.Insert("durable-client-id", "DurableClientId");
+ pp.Insert("durable-timeout", "30");
+
+ CacheFactory cacheFactory = CacheFactory.CreateCacheFactory(pp);
+ Cache cache = cacheFactory.Create();
+ cache.GetPoolFactory().SetSubscriptionEnabled(true);
+ cache.GetPoolFactory().SetSubscriptionAckInterval(5000);
+ cache.GetPoolFactory().SetSubscriptionMessageTrackingTimeout(5000);
+ Util.Log("Created the Geode Cache Programmatically");
+
+ RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.CACHING_PROXY);
+ IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck");
+ Util.Log("Created the DistRegionAck Region Programmatically");
+
+ QueryService<object, object> qService = cache.GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+
+ ICqListener<object, object> cqLstner = new MyCqListener1<object, object>();
+ cqFac.AddCqListener(cqLstner);
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ Util.Log("Attached CqListener");
+ String query = "select * from /DistRegionAck";
+ CqQuery<object, object> qry = qService.NewCq("MyCq", query, cqAttr, true);
+ Util.Log("Created new CqQuery");
+
+ qry.Execute();
+ Util.Log("Executed new CqQuery");
+ Thread.Sleep(10000);
+
+ PendingEventCount(region, expectedPendingQSize, false);
+
+ //Send ready for Event message to Server( only for Durable Clients ).
+ //Server will send queued events to client after recieving this.
+ cache.ReadyForEvents();
+
+ Util.Log("Sent ReadyForEvents message to server");
+ Thread.Sleep(10000);
+ // Close the Geode Cache with keepalive = true. Server will queue events for
+ // durable registered keys and will deliver all events when client will reconnect
+ // within timeout period and send "readyForEvents()"
+
+ PendingEventCount(region, 0, true);
+
+ cache.Close(true);
+
+ Util.Log("Closed the Geode Cache with keepalive as true");
+ }
+
+ void runDurableClientWithTwoPools()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_feeder.Call(InitFeeder2, CacheHelper.Locators, 0);
+ Util.Log("Feeder started.");
+
+ m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, -2, -2);
+ Util.Log("DurableClient with Two Pools Initialized");
+
+ m_feeder.Call(FeederUpdate2, 5, 10);
+ Util.Log("Feeder performed first update.");
+ Thread.Sleep(15000);
+
+ m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, 6, 11); //+1 for marker, so 5+1, 10+1 etc
+ Util.Log("DurableClient with Two Pools after first update");
+
+ m_feeder.Call(FeederUpdate2, 10, 5);
+ Util.Log("Feeder performed second update.");
+ Thread.Sleep(15000);
+
+ m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, 16, 16);
+ Util.Log("DurableClient with Two Pools after second update");
+
+ Thread.Sleep(45000); //45 > 30 secs.
+ m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, -1, -1);
+ Util.Log("DurableClient with Two Pools after timeout");
+
+ m_feeder.Call(ClientDown, false);
+ Util.Log("Feeder and Clients closed.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearEndpoints();
+ CacheHelper.ClearLocators();
+ }
+
+ void RunFeeder()
+ {
+ CacheFactory cacheFactory = CacheFactory.CreateCacheFactory();
+ Util.Log("Feeder connected to the Geode Distributed System");
+
+ Cache cache = cacheFactory.Create();
+ Util.Log("Created the Geode Cache");
+
+ RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY);
+ Util.Log("Created the RegionFactory");
+
+ // Create the Region Programmatically.
+ IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck");
+ Util.Log("Created the Region Programmatically.");
+
+ PendingEventCount(region, 0, true);
+
+ for (int i = 0; i < 10; i++)
+ {
+ region[i] = i;
+ }
+ Thread.Sleep(10000);
+ Util.Log("put on 0-10 keys done.");
+
+ // Close the Geode Cache
+ cache.Close();
+ Util.Log("Closed the Geode Cache");
+ }
+
+ void RunFeeder1()
+ {
+ CacheFactory cacheFactory = CacheFactory.CreateCacheFactory();
+ Util.Log("Feeder connected to the Geode Distributed System");
+
+ Cache cache = cacheFactory.Create();
+ Util.Log("Created the Geode Cache");
+
+ RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY);
+ Util.Log("Created the RegionFactory");
+
+ // Create the Region Programmatically.
+ IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck");
+ Util.Log("Created the Region Programmatically.");
+
+ PendingEventCount(region, 0, true);
+
+ for (int i = 10; i < 20; i++)
+ {
+ region[i] = i;
+ }
+ Thread.Sleep(10000);
+ Util.Log("put on 10-20 keys done.");
+
+ // Close the Geode Cache
+ cache.Close();
+ Util.Log("Closed the Geode Cache");
+ }
+
+ void VerifyEvents()
+ {
+ Util.Log("MyCqListener1.m_cntEvents = {0} ", MyCqListener1<object, object>.m_cntEvents);
+ Assert.AreEqual(MyCqListener1<object, object>.m_cntEvents, 20, "Incorrect events, expected 20");
+ }
+
+ void runCQDurable()
+ {
+ CacheHelper.SetupJavaServers(false, "serverDurableClient.xml");
+ CacheHelper.StartJavaServer(1, "GFECS1");
+ m_client1.Call(RunDurableClient, -2); // 1st time no Q, hence check -2 as PendingEventCount.
+ m_client2.Call(RunFeeder);
+ m_client1.Call(RunDurableClient, 10);
+ m_client2.Call(RunFeeder1);
+ m_client1.Call(RunDurableClient, 10);
+ m_client1.Call(VerifyEvents);
+ Thread.Sleep(45 * 1000); // sleep 45 secs > 30 secs, check -1 as PendingEventCount.
+ m_client1.Call(RunDurableClient, -1);
+ CacheHelper.StopJavaServer(1);
+ }
+
+ [Test]
+ public void DurableFailover()
+ {
+ runDurableFailover();
+ } // end [Test] DurableFailover
+
+ [Test]
+ public void CQDurable()
+ {
+ runCQDurable();
+
+ runDurableClientWithTwoPools();
+ }
+ #endregion
+ }
+}