You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2017/08/11 23:52:57 UTC
[41/52] [partial] geode-native git commit: GEODE-3165: Reogranized
sources relative to the root for better CMake IDE integration.
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCallbackArgN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientCallbackArgN.cs b/clicache/integration-test/ThinClientCallbackArgN.cs
new file mode 100644
index 0000000..8d8ff5b
--- /dev/null
+++ b/clicache/integration-test/ThinClientCallbackArgN.cs
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client.Tests;
+
+ using Apache.Geode.Client;
+
+ using GIRegion = Apache.Geode.Client.IRegion<int, object>;
+ using System.Collections.Generic;
+
+ public class CallbackListener : CacheListenerAdapter<int, object>
+ {
+ int m_creates;
+ int m_updates;
+ int m_invalidates;
+ int m_destroys;
+ int m_regionInvalidate;
+ int m_regionDestroy;
+ int m_regionClear;
+ object m_callbackArg;
+
+ #region Getters
+
+ public int Creates
+ {
+ get { return m_creates; }
+ }
+
+ public int Updates
+ {
+ get { return m_updates; }
+ }
+
+ public int Invalidates
+ {
+ get { return m_invalidates; }
+ }
+
+ public int Destroys
+ {
+ get { return m_destroys; }
+ }
+
+ public int RegionInvalidates
+ {
+ get { return m_regionInvalidate; }
+ }
+
+ public int RegionDestroys
+ {
+ get { return m_regionDestroy; }
+ }
+ public int RegionClear
+ {
+ get { return m_regionClear; }
+ }
+ public object CallbackArg
+ {
+ get { return m_callbackArg; }
+ }
+ #endregion
+
+ public void SetCallbackArg(object callbackArg)
+ {
+ m_callbackArg = callbackArg;
+ }
+
+ private void check(object eventCallback, ref int updateCount)
+ {
+ Log.Fine("check..");
+ if (eventCallback != null)
+ {
+ string callbackArg = eventCallback as string;
+
+ if (callbackArg != null)
+ {
+ string cs = m_callbackArg as string;
+ if (cs != null)
+ {
+ if (callbackArg == cs)
+ {
+ Log.Fine("value matched");
+ updateCount++;
+ }
+ else
+ Log.Fine("value matched NOT");
+ }
+ }
+ else
+ {
+ Log.Fine("Callbackarg is not cacheable string");
+ Portfolio pfCallback = eventCallback as Portfolio;
+ if (pfCallback != null)
+ {
+ Portfolio pf = m_callbackArg as Portfolio;
+ if (pf != null)
+ {
+ if (pf.Pkid == pfCallback.Pkid && pfCallback.ArrayNull == null
+ && pfCallback.ArrayZeroSize != null && pfCallback.ArrayZeroSize.Length == 0)
+ {
+ Log.Fine("value matched");
+ updateCount++;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void checkCallbackArg(EntryEvent<int, object> entryEvent, ref int updateCount)
+ {
+ check(entryEvent.CallbackArgument, ref updateCount);
+ }
+
+ private void checkCallbackArg(RegionEvent<int, object> regionEvent, ref int updateCount)
+ {
+ check(regionEvent.CallbackArgument, ref updateCount);
+ }
+
+ #region CacheListener Members
+
+ public override void AfterCreate(EntryEvent<int, object> ev)
+ {
+ checkCallbackArg(ev, ref m_creates);
+ }
+
+ public override void AfterUpdate(EntryEvent<int, object> ev)
+ {
+ checkCallbackArg(ev, ref m_updates);
+ }
+
+ public override void AfterInvalidate(EntryEvent<int, object> ev)
+ {
+ checkCallbackArg(ev, ref m_invalidates);
+ }
+
+ public override void AfterDestroy(EntryEvent<int, object> ev)
+ {
+ checkCallbackArg(ev, ref m_destroys);
+ }
+
+ public override void AfterRegionInvalidate(RegionEvent<int, object> rev)
+ {
+ checkCallbackArg(rev, ref m_regionInvalidate);
+ }
+
+ public override void AfterRegionDestroy(RegionEvent<int, object> rev)
+ {
+ checkCallbackArg(rev, ref m_regionDestroy);
+ }
+ public override void AfterRegionClear(RegionEvent<int, object> rev)
+ {
+ checkCallbackArg(rev, ref m_regionClear);
+ }
+
+ #endregion
+ }
+
+ [TestFixture]
+ [Category("generics")]
+ public class ThinClientCallbackArg : ThinClientRegionSteps
+ {
+ private TallyWriter<int, object> m_writer;
+ private TallyListener<int, object> m_listener;
+ private CallbackListener m_callbackListener;
+ RegionOperation o_region;
+ private UnitProcess m_client1, m_client2;
+ int key0 = 12;
+ object m_callbackarg = "Gemstone's Callback";
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2 };
+ }
+
+ public void CreateRegion(string locators,
+ bool caching, bool listener, bool writer)
+ {
+ if (listener)
+ {
+ m_listener = new TallyListener<int, object>();
+
+ }
+ else
+ {
+ m_listener = null;
+ }
+ GIRegion region = null;
+ region = CacheHelper.CreateTCRegion_Pool<int, object>(RegionName, true, caching,
+ m_listener, locators, "__TESTPOOL1_", true);
+ if (listener)
+ m_listener.SetCallBackArg(key0);
+
+ if (writer)
+ {
+ m_writer = new TallyWriter<int, object>();
+
+ }
+ else
+ {
+ m_writer = null;
+ }
+ if (writer)
+ {
+ AttributesMutator<int, object> at = region.AttributesMutator;
+ at.SetCacheWriter(m_writer);
+ m_writer.SetCallBackArg(key0);
+ }
+ }
+
+ public void CreateRegion2(string locators,
+ bool caching, bool listener, bool writer)
+ {
+ CallbackListener callbackLis = null;
+ if (listener)
+ {
+ m_callbackListener = new CallbackListener();
+ m_callbackListener.SetCallbackArg(m_callbackarg);
+ callbackLis = m_callbackListener;
+ }
+ else
+ {
+ m_listener = null;
+ }
+ GIRegion region = null;
+ region = CacheHelper.CreateTCRegion_Pool<int, object>(RegionName, true, caching,
+ callbackLis, locators, "__TESTPOOL1_", true);
+ }
+
+ public void ValidateLocalListenerWriterData()
+ {
+ Thread.Sleep(2000);
+ Assert.AreEqual(true, m_writer.IsWriterInvoked, "Writer should be invoked");
+ Assert.AreEqual(true, m_listener.IsListenerInvoked, "Listener should be invoked");
+ Assert.AreEqual(true, m_writer.IsCallBackArgCalled, "Writer CallbackArg should be invoked");
+ Assert.AreEqual(true, m_listener.IsCallBackArgCalled, "Listener CallbackArg should be invoked");
+ m_listener.ShowTallies();
+ m_writer.ShowTallies();
+ }
+
+ public void ValidateEvents()
+ {
+ Assert.AreEqual(15, m_writer.Creates, "Should be 10 creates");
+ Assert.AreEqual(15, m_listener.Creates, "Should be 10 creates");
+ Assert.AreEqual(15, m_writer.Updates, "Should be 5 updates");
+ Assert.AreEqual(15, m_listener.Updates, "Should be 5 updates");
+ Assert.AreEqual(0, m_writer.Invalidates, "Should be 0 invalidates");
+ Assert.AreEqual(5, m_listener.Invalidates, "Should be 5 invalidates");
+ Assert.AreEqual(10, m_writer.Destroys, "Should be 10 destroys"); // 5 destroys + 5 removes
+ Assert.AreEqual(10, m_listener.Destroys, "Should be 10 destroys"); // 5 destroys + 5 removes
+ }
+
+ public void CallOp()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.PutOp(5, key0);
+ Thread.Sleep(1000); // let the events reach at other end.
+ o_region.PutOp(5, key0);
+ Thread.Sleep(1000);
+ o_region.InvalidateOp(5, key0);
+ Thread.Sleep(1000);
+ o_region.DestroyOp(5, key0);
+ Thread.Sleep(1000); // let the events reach at other end.
+ o_region.PutOp(5, key0);
+ Thread.Sleep(1000);
+ o_region.RemoveOp(5, key0);
+ Thread.Sleep(1000);
+ }
+
+ void RegisterPdxType8()
+ {
+ Serializable.RegisterPdxType(PdxTests.PdxTypes8.CreateDeserializable);
+ }
+
+ void runCallbackArgTest()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer");
+ m_client1.Call(CreateRegion, CacheHelper.Locators,
+ true, true, true);
+ m_client1.Call(RegisterAllKeys, new string[] { RegionName });
+
+ Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer");
+ m_client2.Call(CreateRegion, CacheHelper.Locators,
+ true, true, true);
+ m_client2.Call(RegisterAllKeys, new string[] { RegionName });
+
+ m_client2.Call(RegisterPdxType8);
+
+ m_client1.Call(CallOp);
+
+
+ m_client1.Call(ValidateLocalListenerWriterData);
+ m_client1.Call(ValidateEvents);
+
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ private bool m_isSet = false;
+ public void SetCallbackArg()
+ {
+ if (!m_isSet)
+ {
+ m_isSet = true;
+ m_callbackarg = new Portfolio(1, 1);
+ //TODO:;split
+ Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache);
+ Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache);
+ }
+ }
+
+ public void TestCreatesAndUpdates()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.Region.Add("Key-1", "Val-1", m_callbackarg);
+ o_region.Region.Put("Key-1", "NewVal-1", m_callbackarg);
+ Thread.Sleep(10000);
+ }
+
+ public void TestInvalidates()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.Region.GetLocalView().Add(1234, 1234, m_callbackarg);
+ o_region.Region.GetLocalView().Add(12345, 12345, m_callbackarg);
+ o_region.Region.GetLocalView().Add(12346, 12346, m_callbackarg);
+ o_region.Region.GetLocalView().Put(1234, "Val-1", m_callbackarg);
+ o_region.Region.GetLocalView().Invalidate(1234, m_callbackarg);
+ Assert.AreEqual(o_region.Region.GetLocalView().Remove(12345, 12345, m_callbackarg), true, "Result of remove should be true, as this value exists locally.");
+ Assert.AreEqual(o_region.Region.GetLocalView().ContainsKey(12345), false, "containsKey should be false");
+ Assert.AreEqual(o_region.Region.GetLocalView().Remove(12346, m_callbackarg), true, "Result of remove should be true, as this value exists locally.");
+ Assert.AreEqual(o_region.Region.GetLocalView().ContainsKey(12346), false, "containsKey should be false");
+ o_region.Region.Invalidate("Key-1", m_callbackarg);
+ o_region.Region.InvalidateRegion(m_callbackarg);
+ }
+
+ public void TestDestroy()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.Region.Remove("Key-1", m_callbackarg);
+ //o_region.Region.DestroyRegion(m_callbackarg);
+ }
+
+ public void TestRemove()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.Region.Remove("Key-1", "NewVal-1", m_callbackarg);
+ o_region.Region.DestroyRegion(m_callbackarg);
+ }
+
+ public void TestlocalClear()
+ {
+ o_region = new RegionOperation(RegionName);
+ o_region.Region.GetLocalView().Clear(m_callbackarg);
+ }
+ public void TestValidate()
+ {
+ Thread.Sleep(10000);
+ Assert.AreEqual(5, m_callbackListener.Creates, "Should be 5 creates");
+ Assert.AreEqual(3, m_callbackListener.Updates, "Should be 3 update");
+ Assert.AreEqual(2, m_callbackListener.Invalidates, "Should be 2 invalidate");
+ Assert.AreEqual(4, m_callbackListener.Destroys, "Should be 4 destroy");
+ Assert.AreEqual(1, m_callbackListener.RegionInvalidates, "Should be 1 region invalidates");
+ Assert.AreEqual(1, m_callbackListener.RegionDestroys, "Should be 1 regiondestroy");
+ Assert.AreEqual(1, m_callbackListener.RegionClear, "Should be 1 RegionClear");
+ }
+
+ void runCallbackArgTest2(int callbackArgChange)
+ {
+ if (callbackArgChange == 1)
+ {
+ //change now custom type
+ m_callbackarg = new Portfolio(1, 1);
+ m_client1.Call(SetCallbackArg);
+ m_client2.Call(SetCallbackArg);
+ }
+
+ m_callbackListener = new CallbackListener();
+ m_callbackListener.SetCallbackArg(m_callbackarg);
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer");
+ m_client1.Call(CreateRegion2, CacheHelper.Locators,
+ true, true, false);
+ m_client1.Call(RegisterAllKeys, new string[] { RegionName });
+
+ Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer");
+ m_client2.Call(CreateRegion2, CacheHelper.Locators,
+ true, false, false);
+
+ m_client2.Call(TestCreatesAndUpdates);
+ m_client1.Call(TestInvalidates);
+ m_client2.Call(TestDestroy);
+ m_client2.Call(TestCreatesAndUpdates);
+ m_client1.Call(TestlocalClear);
+ m_client2.Call(TestRemove);
+ m_client1.Call(TestValidate);
+
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ private bool isRegistered = false;
+ public void registerDefaultCacheableType()
+ {
+ if (!isRegistered)
+ {
+ Serializable.RegisterTypeGeneric(DefaultType.CreateDeserializable, CacheHelper.DCache);
+ isRegistered = true;
+ }
+ }
+
+
+ public void CallOp2()
+ {
+ o_region = new RegionOperation(RegionName);
+ DefaultType dc = new DefaultType(true);
+ o_region.Region.Put("key-1", dc, null);
+ Thread.Sleep(1000); // let the events reach at other end.
+ }
+
+ public void ValidateData()
+ {
+ o_region = new RegionOperation(RegionName);
+ DefaultType dc = (DefaultType)o_region.Region.Get("key-1", null);
+
+ Assert.AreEqual(dc.CBool, true, "bool is not equal");
+ Assert.AreEqual(dc.CInt, 1000, "int is not equal");
+
+ int[] cia = dc.CIntArray;
+ Assert.IsNotNull(cia, "Int array is null");
+ Assert.AreEqual(3, cia.Length, "Int array are not three");
+
+ string[] csa = dc.CStringArray;
+ Assert.IsNotNull(csa, "String array is null");
+ Assert.AreEqual(2, csa.Length, "String array length is not two");
+
+ Assert.AreEqual(dc.CFileName, "geode.txt", "Cacheable filename is not equal");
+
+ /*
+ Assert.IsNotNull(dc.CHashSet, "hashset is null");
+ Assert.AreEqual(2, dc.CHashSet.Count, "hashset size is not two");
+ * */
+
+ Assert.IsNotNull(dc.CHashMap, "hashmap is null");
+ Assert.AreEqual(1, dc.CHashMap.Count, "hashmap size is not one");
+
+ //Assert.IsNotNull(dc.CDate, "Date is null");
+
+ Assert.IsNotNull(dc.CVector);
+ Assert.AreEqual(2, dc.CVector.Count, "Vector size is not two");
+
+ //Assert.IsNotNull(dc.CObject);
+ //Assert.AreEqual("key", ((CustomSerializableObject)dc.CObject).key, "Object key is not same");
+ //Assert.AreEqual("value", ((CustomSerializableObject)dc.CObject).value, "Object value is not same");
+ }
+
+ void runCallbackArgTest3()
+ {
+
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription6.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer");
+ m_client1.Call(CreateRegion, CacheHelper.Locators,
+ true, false, false);
+ // m_client1.Call(RegisterAllKeys, new string[] { RegionName });
+
+ Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer");
+ m_client2.Call(CreateRegion, CacheHelper.Locators,
+ true, false, false);
+
+ m_client1.Call(registerDefaultCacheableType);
+ m_client2.Call(registerDefaultCacheableType);
+
+ m_client2.Call(RegisterAllKeys, new string[] { RegionName });
+
+ m_client1.Call(CallOp2);
+
+ m_client2.Call(ValidateData);
+
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ public void TestRemoveAll()
+ {
+ o_region = new RegionOperation(RegionName);
+ ICollection<object> keys = new LinkedList<object>();
+ for(int i =0; i< 10; i++)
+ {
+ o_region.Region["Key-"+i] = "Value-"+i;
+ keys.Add("Key-" + i);
+ }
+ o_region.Region.RemoveAll(keys, m_callbackarg);
+ }
+
+ public void TestPutAll()
+ {
+ o_region = new RegionOperation(RegionName);
+ Dictionary<Object, Object> entryMap = new Dictionary<Object, Object>();
+ for (Int32 item = 0; item < 10; item++)
+ {
+ int K = item;
+ string value = item.ToString();
+ entryMap.Add(K, value);
+ }
+ o_region.Region.PutAll(entryMap, 15, m_callbackarg);
+ }
+
+ public void TestGetAll()
+ {
+ o_region = new RegionOperation(RegionName);
+ List<Object> keys = new List<Object>();
+ for (int item = 0; item < 10; item++)
+ {
+ Object K = item;
+ keys.Add(K);
+ }
+ Dictionary<Object, Object> values = new Dictionary<Object, Object>();
+ o_region.Region.GetAll(keys.ToArray(), values, null, true, m_callbackarg);
+
+ Dictionary<Object, Object>.Enumerator enumerator = values.GetEnumerator();
+ while (enumerator.MoveNext())
+ {
+ Util.Log("Values after getAll with CallBack Key = {0} Value = {1} ", enumerator.Current.Key.ToString(), enumerator.Current.Value.ToString());
+ }
+ }
+
+ public void TestValidateRemoveAllCallback()
+ {
+ Thread.Sleep(10000);
+ Assert.AreEqual(10, m_callbackListener.Destroys, "Should be 10 destroy");
+ }
+
+ public void TestValidatePutAllCallback()
+ {
+ Thread.Sleep(10000);
+ Assert.AreEqual(10, m_callbackListener.Creates, "Should be 10 creates");
+ Assert.AreEqual("Gemstone's Callback", m_callbackListener.CallbackArg, "CallBackArg for putAll should be same");
+ }
+
+ void runPutAllCallbackArgTest()
+ {
+ m_callbackListener = new CallbackListener();
+ m_callbackListener.SetCallbackArg(m_callbackarg);
+
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer");
+ m_client1.Call(CreateRegion2, CacheHelper.Locators,
+ true, true, false);
+ m_client1.Call(RegisterAllKeys, new string[] { RegionName });
+ Util.Log("RegisterAllKeys completed..");
+
+ Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer");
+ m_client2.Call(CreateRegion2, CacheHelper.Locators,
+ true, false, false);
+ Util.Log("CreateRegion2 completed..");
+
+ m_client2.Call(TestPutAll);
+ Util.Log("TestPutAll completed..");
+ m_client1.Call(TestValidatePutAllCallback);
+ Util.Log("TestValidatePutAllCallback completed..");
+ m_client2.Call(TestGetAll);
+ Util.Log("TestGetAll completed..");
+
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ void runRemoveAllCallbackArgTest()
+ {
+
+ m_callbackListener = new CallbackListener();
+ m_callbackListener.SetCallbackArg(m_callbackarg);
+ CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer");
+ m_client1.Call(CreateRegion2, CacheHelper.Locators,
+ true, true, false);
+ m_client1.Call(RegisterAllKeys, new string[] { RegionName });
+ Util.Log("RegisterAllKeys completed..");
+
+ Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer");
+ m_client2.Call(CreateRegion2,CacheHelper.Locators,
+ true, false, false);
+ Util.Log("CreateRegion2 completed..");
+
+ m_client2.Call(TestRemoveAll);
+ Util.Log("TestRemoveAll completed..");
+ m_client1.Call(TestValidateRemoveAllCallback);
+ Util.Log("TestValidateRemoveAllCallback completed..");
+
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ base.EndTest();
+ }
+
+ [Test]
+ public void ThinClientCallbackArgTest()
+ {
+ runCallbackArgTest();
+ }
+
+ [Test]
+ public void ThinClientCallbackArgTest2()
+ {
+ for (int i = 0; i < 2; i++)
+ {
+ runCallbackArgTest2(i);
+ }
+ }
+
+ [Test]
+ public void ThinClientCallbackArgTest3()
+ {
+ runCallbackArgTest3();
+ }
+
+ [Test]
+ public void RemoveAllCallbackArgTest()
+ {
+ runRemoveAllCallbackArgTest();
+ }
+
+ [Test]
+ public void PutAllCallbackArgTest()
+ {
+ runPutAllCallbackArgTest();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientConflationTestsN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientConflationTestsN.cs b/clicache/integration-test/ThinClientConflationTestsN.cs
new file mode 100644
index 0000000..0672520
--- /dev/null
+++ b/clicache/integration-test/ThinClientConflationTestsN.cs
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client.Tests;
+ using Apache.Geode.Client;
+
+ #region Listener
+ class ConflationListner<TKey, TValue> : ICacheListener<TKey, TValue>
+ {
+ #region Private members
+
+ private int m_events = 0;
+ private TValue m_value = default(TValue);
+ #endregion
+
+ public static ConflationListner<TKey, TValue> Create()
+ {
+ Util.Log(" ConflationListner Created");
+
+ return new ConflationListner<TKey, TValue>();
+ }
+
+ private void check(EntryEvent<TKey, TValue> ev)
+ {
+ m_events++;
+ TKey key = ev.Key;
+ TValue value = ev.NewValue;
+ m_value = value;
+ Util.Log("Region:{0}:: Key:{1}, Value:{2}", ev.Region.Name, key, value);
+
+ }
+
+ public void validate(bool conflation)
+ {
+ if (conflation)
+ {
+ string msg1 = string.Format("Conflation On: Expected 2 events but got {0}", m_events);
+ Assert.AreEqual(2, m_events, msg1);
+ }
+ else
+ {
+ string msg2 = string.Format("Conflation Off: Expected 5 events but got {0}", m_events);
+ Assert.AreEqual(5, m_events, msg2);
+ }
+
+ string msg3 = string.Format("Expected Value =5, Actual = {0}", m_value);
+ Assert.AreEqual(5, m_value, msg3);
+ }
+
+ #region ICacheListener Members
+
+ public virtual void AfterCreate(EntryEvent<TKey, TValue> ev)
+ {
+ check(ev);
+ }
+
+ public virtual void AfterUpdate(EntryEvent<TKey, TValue> ev)
+ {
+ check(ev);
+ }
+
+ public virtual void AfterDestroy(EntryEvent<TKey, TValue> ev) { }
+
+ public virtual void AfterInvalidate(EntryEvent<TKey, TValue> ev) { }
+
+ public virtual void AfterRegionDestroy(RegionEvent<TKey, TValue> ev) { }
+
+ public virtual void AfterRegionInvalidate(RegionEvent<TKey, TValue> ev) { }
+
+ public virtual void AfterRegionClear(RegionEvent<TKey, TValue> ev) { }
+
+ public virtual void AfterRegionLive(RegionEvent<TKey, TValue> ev)
+ {
+ Util.Log("DurableListener: Received AfterRegionLive event of region: {0}", ev.Region.Name);
+ }
+
+ public virtual void Close(IRegion<TKey, TValue> region) { }
+ public virtual void AfterRegionDisconnected(IRegion<TKey, TValue> region) { }
+
+ #endregion
+ }
+ #endregion
+
+ [TestFixture]
+ [Category("group1")]
+ [Category("unicast_only")]
+ [Category("generics")]
+
+ public class ThinClientConflationTests : ThinClientRegionSteps
+ {
+ #region Private members
+
+ private UnitProcess m_client1, m_client2, m_feeder;
+ private string[] keys = { "Key-1", "Key-2", "Key-3", "Key-4", "Key-5" };
+
+ private static string[] DurableClientIds = { "DurableClientId1", "DurableClientId2" };
+
+ static string[] Regions = { "ConflatedRegion", "NonConflatedRegion" };
+
+ private static ConflationListner<object, object> m_listener1C1, m_listener2C1, m_listener1C2, m_listener2C2;
+
+ #endregion
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ m_feeder = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2, m_feeder };
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ try
+ {
+ m_client1.Call(CacheHelper.Close);
+ m_client2.Call(CacheHelper.Close);
+ m_feeder.Call(CacheHelper.Close);
+ CacheHelper.ClearEndpoints();
+ }
+ finally
+ {
+ CacheHelper.StopJavaServers();
+ }
+ base.EndTest();
+ }
+
+ #region Common Functions
+
+ public void InitFeeder(string locators, int redundancyLevel)
+ {
+ CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false);
+ CacheHelper.CreateTCRegion_Pool<object, object>(Regions[0], false, false, null,
+ CacheHelper.Locators, "__TESTPOOL1_", false);
+ CacheHelper.CreateTCRegion_Pool<object, object>(Regions[1], false, false, null,
+ CacheHelper.Locators, "__TESTPOOL1_", false);
+ }
+
+ public void InitDurableClient(int client, string locators, string conflation)
+ {
+ // Create DurableListener for first time and use same afterward.
+ ConflationListner<object, object> checker1 = null;
+ ConflationListner<object, object> checker2 = null;
+ string durableId = ThinClientConflationTests.DurableClientIds[client - 1];
+ if (client == 1)
+ {
+ ThinClientConflationTests.m_listener1C1 = ConflationListner<object, object>.Create();
+ ThinClientConflationTests.m_listener2C1 = ConflationListner<object, object>.Create();
+ checker1 = ThinClientConflationTests.m_listener1C1;
+ checker2 = ThinClientConflationTests.m_listener2C1;
+ }
+ else // client == 2
+ {
+ ThinClientConflationTests.m_listener1C2 = ConflationListner<object, object>.Create();
+ ThinClientConflationTests.m_listener2C2 = ConflationListner<object, object>.Create();
+ checker1 = ThinClientConflationTests.m_listener1C2;
+ checker2 = ThinClientConflationTests.m_listener2C2;
+ }
+ CacheHelper.InitConfigForConflation_Pool(locators, durableId, conflation);
+ CacheHelper.CreateTCRegion_Pool<object, object>(Regions[0], false, true, checker1,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(Regions[1], false, true, checker2,
+ CacheHelper.Locators, "__TESTPOOL1_", true);
+
+ //CacheHelper.DCache.ReadyForEvents();
+
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(Regions[0]);
+ region1.GetSubscriptionService().RegisterAllKeys(true);
+ IRegion<object, object> region2 = CacheHelper.GetVerifyRegion<object, object>(Regions[1]);
+ region2.GetSubscriptionService().RegisterAllKeys(true);
+ }
+
+ public void ReadyForEvents()
+ {
+ CacheHelper.DCache.ReadyForEvents();
+ }
+
+ public void FeederUpdate(int keyIdx)
+ {
+ IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(Regions[0]);
+
+
+ region1[keys[keyIdx]] = 1;
+ region1[keys[keyIdx]] = 2;
+ region1[keys[keyIdx]] = 3;
+ region1[keys[keyIdx]] = 4;
+ region1[keys[keyIdx]] = 5;
+
+ IRegion<object, object> region2 = CacheHelper.GetVerifyRegion<object, object>(Regions[1]);
+
+ region2[keys[keyIdx]] = 1;
+ region2[keys[keyIdx]] = 2;
+ region2[keys[keyIdx]] = 3;
+ region2[keys[keyIdx]] = 4;
+ region2[keys[keyIdx]] = 5;
+ }
+
+ public void ClientDown()
+ {
+ CacheHelper.Close();
+ }
+
+
+ public void KillServer()
+ {
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ }
+
+ public delegate void KillServerDelegate();
+
+ #endregion
+
+
+ public void Validate(int client, int region, bool conflate)
+ {
+ ConflationListner<object, object> checker = null;
+ if (client == 1)
+ {
+ if (region == 1)
+ checker = ThinClientConflationTests.m_listener1C1;
+ else
+ checker = ThinClientConflationTests.m_listener2C1;
+ }
+ else // client == 2
+ {
+ if (region == 1)
+ checker = ThinClientConflationTests.m_listener1C2;
+ else
+ checker = ThinClientConflationTests.m_listener2C2;
+ }
+
+ if (checker != null)
+ {
+ checker.validate(conflate);
+ }
+ else
+ {
+ Assert.Fail("Checker is NULL!");
+ }
+ }
+
+ void runConflationBasic()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_conflation.xml");
+
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_feeder.Call(InitFeeder, CacheHelper.Locators, 0);
+ Util.Log("Feeder initialized.");
+
+ //Test "true" and "false" settings
+ m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, "true");
+ m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, "false");
+ Util.Log("Clients initialized for first time.");
+
+ m_feeder.Call(FeederUpdate, 0);
+ Util.Log("Feeder performed first update.");
+
+ Util.Log("Client1 sending readyForEvents().");
+ m_client1.Call(ReadyForEvents);
+ Thread.Sleep(5000);
+ Util.Log("Validating Client 1 Region 1.");
+ m_client1.Call(Validate, 1, 1, true);
+ Util.Log("Validating Client 1 Region 2.");
+ m_client1.Call(Validate, 1, 2, true);
+
+ Util.Log("Client2 sending readyForEvents().");
+ m_client2.Call(ReadyForEvents);
+ Thread.Sleep(5000);
+ Util.Log("Validating Client 2 Region 1.");
+ m_client2.Call(Validate, 2, 1, false);
+ Util.Log("Validating Client 2 Region 1.");
+ m_client2.Call(Validate, 2, 2, false);
+
+ //Close Clients.
+ m_client1.Call(ClientDown);
+ m_client2.Call(ClientDown);
+ Util.Log("First step complete, tested true/false options.");
+
+ //Test "server" and not set settings
+ m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, "server");
+ m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, "");
+ Util.Log("Clients initialized second times.");
+
+ m_feeder.Call(FeederUpdate, 1);
+ Util.Log("Feeder performed second update.");
+
+ Util.Log("Client1 sending readyForEvents().");
+ m_client1.Call(ReadyForEvents);
+ Thread.Sleep(5000);
+ Util.Log("Validating Client 1 Region 1.");
+ m_client1.Call(Validate, 1, 1, true);
+ Util.Log("Validating Client 1 Region 2.");
+ m_client1.Call(Validate, 1, 2, false);
+
+ Util.Log("Client2 sending readyForEvents().");
+ m_client2.Call(ReadyForEvents);
+ Thread.Sleep(5000);
+ Util.Log("Validating Client 2 Region 1.");
+ m_client2.Call(Validate, 2, 1, true);
+ Util.Log("Validating Client 2 Region 2.");
+ m_client2.Call(Validate, 2, 2, false);
+
+ //Close Clients.
+ m_client1.Call(ClientDown);
+ m_client2.Call(ClientDown);
+ m_feeder.Call(ClientDown);
+ Util.Log("Feeder and Clients closed.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+
+ CacheHelper.ClearLocators();
+ CacheHelper.ClearEndpoints();
+ }
+
+ [Test]
+ public void ConflationBasic()
+ {
+ runConflationBasic();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCqIRTestsN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientCqIRTestsN.cs b/clicache/integration-test/ThinClientCqIRTestsN.cs
new file mode 100644
index 0000000..ea90850
--- /dev/null
+++ b/clicache/integration-test/ThinClientCqIRTestsN.cs
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client.Tests;
+ using Apache.Geode.Client;
+
+
+ [TestFixture]
+ [Category("group2")]
+ [Category("unicast_only")]
+ [Category("generics")]
+
+ public class ThinClientCqIRTests : ThinClientRegionSteps
+ {
+ #region Private members
+
+ private UnitProcess m_client1;
+ private UnitProcess m_client2;
+ private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2",
+ "Portfolios3" };
+ private static string QERegionName = "Portfolios";
+ private static string CqName = "MyCq";
+
+ #endregion
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2 };
+ }
+
+ [TestFixtureSetUp]
+ public override void InitTests()
+ {
+ base.InitTests();
+ m_client1.Call(InitClient);
+ m_client2.Call(InitClient);
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ CacheHelper.StopJavaServers();
+ base.EndTest();
+ }
+
+
+ public void InitClient()
+ {
+ CacheHelper.Init();
+ try
+ {
+ Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache);
+ Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ // ignore since we run multiple iterations for pool and non pool configs
+ }
+ }
+ public void StepOne(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true,
+ null, locators, "__TESTPOOL1_", true);
+
+ IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes;
+ region.CreateSubRegion(QueryRegionNames[1], regattrs);
+ }
+
+ public void StepTwo()
+ {
+ IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]);
+ IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]);
+ IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]);
+ IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]);
+
+ QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache);
+ Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+
+ qh.PopulatePortfolioData(region0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePositionData(region1, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioData(region2, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioData(region3, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ }
+
+ public void StepTwoQT()
+ {
+ IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]);
+
+ QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache);
+
+ qh.PopulatePortfolioData(region0, 100, 20, 100);
+ qh.PopulatePositionData(subRegion0, 100, 20);
+ }
+
+ public void StepOneQE(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
+ null, locators, "__TESTPOOL1_", true);
+
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
+ Portfolio p1 = new Portfolio(1, 100);
+ Portfolio p2 = new Portfolio(2, 100);
+ Portfolio p3 = new Portfolio(3, 100);
+ Portfolio p4 = new Portfolio(4, 100);
+
+ region["1"] = p1;
+ region["2"] = p2;
+ region["3"] = p3;
+ region["4"] = p4;
+
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
+ cqFac.AddCqListener(cqLstner);
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false);
+ ICqResults<object> results = qry.ExecuteWithInitialResults();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ Util.Log("Results size {0}.", results.Size);
+
+ SelectResultsIterator<object> iter = results.GetIterator();
+
+ while (iter.HasNext)
+ {
+ object item = iter.Next();
+ if (item != null)
+ {
+ Struct st = item as Struct;
+
+ string key = st["key"] as string;
+
+ Assert.IsNotNull(key, "key is null");
+
+ Portfolio port = st["value"] as Portfolio;
+
+ if (port == null)
+ {
+ Position pos = st["value"] as Position;
+ if (pos == null)
+ {
+ string cs = item as string;
+
+ if (cs == null)
+ {
+ Assert.Fail("value is null");
+ Util.Log("Query got other/unknown object.");
+ }
+ else
+ {
+ Util.Log("Query got string : {0}.", cs);
+ }
+ }
+ else
+ {
+ Util.Log("Query got Position object with secId {0}, shares {1}.", pos.SecId, pos.SharesOutstanding);
+ }
+ }
+ else
+ {
+ Util.Log("Query got Portfolio object with ID {0}, pkid {1}.", port.ID, port.Pkid);
+ }
+ }
+ }
+ qry = qs.GetCq(CqName);
+ qry.Stop();
+ qry.Close();
+ // Bring down the region
+ region.GetLocalView().DestroyRegion();
+ }
+
+ void runCqQueryIRTest()
+ {
+ CacheHelper.SetupJavaServers(true, "remotequeryN.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_client1.Call(StepOne, CacheHelper.Locators);
+ Util.Log("StepOne complete.");
+
+ m_client1.Call(StepTwo);
+ Util.Log("StepTwo complete.");
+
+ m_client1.Call(StepOneQE, CacheHelper.Locators);
+ Util.Log("StepOne complete.");
+
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+ }
+
+ [Test]
+ public void CqQueryIRTest()
+ {
+ runCqQueryIRTest();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCqTestsN.cs
----------------------------------------------------------------------
diff --git a/clicache/integration-test/ThinClientCqTestsN.cs b/clicache/integration-test/ThinClientCqTestsN.cs
new file mode 100644
index 0000000..2dd8118
--- /dev/null
+++ b/clicache/integration-test/ThinClientCqTestsN.cs
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.Geode.Client.UnitTests
+{
+ using NUnit.Framework;
+ using Apache.Geode.DUnitFramework;
+ using Apache.Geode.Client.Tests;
+ using Apache.Geode.Client;
+
+ public class MyCqListener<TKey, TResult> : ICqListener<TKey, TResult>
+ {
+ #region Private members
+ private bool m_failedOver = false;
+ private UInt32 m_eventCountBefore = 0;
+ private UInt32 m_errorCountBefore = 0;
+ private UInt32 m_eventCountAfter = 0;
+ private UInt32 m_errorCountAfter = 0;
+
+ #endregion
+
+ #region Public accessors
+
+ public void failedOver()
+ {
+ m_failedOver = true;
+ }
+ public UInt32 getEventCountBefore()
+ {
+ return m_eventCountBefore;
+ }
+ public UInt32 getErrorCountBefore()
+ {
+ return m_errorCountBefore;
+ }
+ public UInt32 getEventCountAfter()
+ {
+ return m_eventCountAfter;
+ }
+ public UInt32 getErrorCountAfter()
+ {
+ return m_errorCountAfter;
+ }
+ #endregion
+
+ public virtual void OnEvent(CqEvent<TKey, TResult> ev)
+ {
+ Util.Log("MyCqListener::OnEvent called");
+ if (m_failedOver == true)
+ m_eventCountAfter++;
+ else
+ m_eventCountBefore++;
+
+ //IGeodeSerializable val = ev.getNewValue();
+ //ICacheableKey key = ev.getKey();
+
+ TResult val = (TResult)ev.getNewValue();
+ /*ICacheableKey*/
+ TKey key = ev.getKey();
+
+ CqOperationType opType = ev.getQueryOperation();
+ //CacheableString keyS = key as CacheableString;
+ string keyS = key.ToString(); //as string;
+ Portfolio pval = val as Portfolio;
+ PortfolioPdx pPdxVal = val as PortfolioPdx;
+ Assert.IsTrue((pPdxVal != null) || (pval != null));
+ //string opStr = "DESTROY";
+ /*if (opType == CqOperationType.OP_TYPE_CREATE)
+ opStr = "CREATE";
+ else if (opType == CqOperationType.OP_TYPE_UPDATE)
+ opStr = "UPDATE";*/
+
+ //Util.Log("key {0}, value ({1},{2}), op {3}.", keyS,
+ // pval.ID, pval.Pkid, opStr);
+ }
+ public virtual void OnError(CqEvent<TKey, TResult> ev)
+ {
+ Util.Log("MyCqListener::OnError called");
+ if (m_failedOver == true)
+ m_errorCountAfter++;
+ else
+ m_errorCountBefore++;
+ }
+ public virtual void Close()
+ {
+ Util.Log("MyCqListener::close called");
+ }
+ public virtual void Clear()
+ {
+ Util.Log("MyCqListener::Clear called");
+ m_eventCountBefore = 0;
+ m_errorCountBefore = 0;
+ m_eventCountAfter = 0;
+ m_errorCountAfter = 0;
+ }
+ }
+
+ public class MyCqListener1<TKey, TResult> : ICqListener<TKey, TResult>
+ {
+ public static UInt32 m_cntEvents = 0;
+
+ public virtual void OnEvent(CqEvent<TKey, TResult> ev)
+ {
+ m_cntEvents++;
+ Util.Log("MyCqListener1::OnEvent called");
+ Object val = (Object)ev.getNewValue();
+ Object pkey = (Object)ev.getKey();
+ int value = (int)val;
+ int key = (int)pkey;
+ CqOperationType opType = ev.getQueryOperation();
+ String opStr = "Default";
+ if (opType == CqOperationType.OP_TYPE_CREATE)
+ opStr = "CREATE";
+ else if (opType == CqOperationType.OP_TYPE_UPDATE)
+ opStr = "UPDATE";
+
+ Util.Log("MyCqListener1::OnEvent called with {0} , key = {1}, value = {2} ",
+ opStr, key, value);
+ }
+ public virtual void OnError(CqEvent<TKey, TResult> ev)
+ {
+ Util.Log("MyCqListener1::OnError called");
+ }
+ public virtual void Close()
+ {
+ Util.Log("MyCqListener1::close called");
+ }
+ }
+
+
+
+ public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult>
+ {
+ #region Private members
+ private bool m_failedOver = false;
+ private UInt32 m_eventCountBefore = 0;
+ private UInt32 m_errorCountBefore = 0;
+ private UInt32 m_eventCountAfter = 0;
+ private UInt32 m_errorCountAfter = 0;
+ private UInt32 m_CqConnectedCount = 0;
+ private UInt32 m_CqDisConnectedCount = 0;
+
+ #endregion
+
+ #region Public accessors
+
+ public MyCqStatusListener(int id)
+ {
+ }
+
+ public void failedOver()
+ {
+ m_failedOver = true;
+ }
+ public UInt32 getEventCountBefore()
+ {
+ return m_eventCountBefore;
+ }
+ public UInt32 getErrorCountBefore()
+ {
+ return m_errorCountBefore;
+ }
+ public UInt32 getEventCountAfter()
+ {
+ return m_eventCountAfter;
+ }
+ public UInt32 getErrorCountAfter()
+ {
+ return m_errorCountAfter;
+ }
+ public UInt32 getCqConnectedCount()
+ {
+ return m_CqConnectedCount;
+ }
+ public UInt32 getCqDisConnectedCount()
+ {
+ return m_CqDisConnectedCount;
+ }
+ #endregion
+
+ public virtual void OnEvent(CqEvent<TKey, TResult> ev)
+ {
+ Util.Log("MyCqStatusListener::OnEvent called");
+ if (m_failedOver == true)
+ m_eventCountAfter++;
+ else
+ m_eventCountBefore++;
+
+ TResult val = (TResult)ev.getNewValue();
+ TKey key = ev.getKey();
+
+ CqOperationType opType = ev.getQueryOperation();
+ string keyS = key.ToString(); //as string;
+ }
+ public virtual void OnError(CqEvent<TKey, TResult> ev)
+ {
+ Util.Log("MyCqStatusListener::OnError called");
+ if (m_failedOver == true)
+ m_errorCountAfter++;
+ else
+ m_errorCountBefore++;
+ }
+ public virtual void Close()
+ {
+ Util.Log("MyCqStatusListener::close called");
+ }
+ public virtual void OnCqConnected()
+ {
+ m_CqConnectedCount++;
+ Util.Log("MyCqStatusListener::OnCqConnected called");
+ }
+ public virtual void OnCqDisconnected()
+ {
+ m_CqDisConnectedCount++;
+ Util.Log("MyCqStatusListener::OnCqDisconnected called");
+ }
+
+ public virtual void Clear()
+ {
+ Util.Log("MyCqStatusListener::Clear called");
+ m_eventCountBefore = 0;
+ m_errorCountBefore = 0;
+ m_eventCountAfter = 0;
+ m_errorCountAfter = 0;
+ m_CqConnectedCount = 0;
+ m_CqDisConnectedCount = 0;
+ }
+ }
+
+ [TestFixture]
+ [Category("group3")]
+ [Category("unicast_only")]
+ [Category("generics")]
+
+ public class ThinClientCqTests : ThinClientRegionSteps
+ {
+ #region Private members
+ private static bool m_usePdxObjects = false;
+ private UnitProcess m_client1;
+ private UnitProcess m_client2;
+ private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2",
+ "Portfolios3" };
+ private static string QERegionName = "Portfolios";
+ private static string CqName = "MyCq";
+
+ private static string CqName1 = "testCQAllServersLeave";
+ private static string CqName2 = "testCQAllServersLeave1";
+
+ private static string CqQuery1 = "select * from /DistRegionAck";
+ private static string CqQuery2 = "select * from /DistRegionAck1";
+ //private static string CqName1 = "MyCq1";
+
+ #endregion
+
+ protected override ClientBase[] GetClients()
+ {
+ m_client1 = new UnitProcess();
+ m_client2 = new UnitProcess();
+ return new ClientBase[] { m_client1, m_client2 };
+ }
+
+ [TestFixtureSetUp]
+ public override void InitTests()
+ {
+ base.InitTests();
+ m_client1.Call(InitClient);
+ m_client2.Call(InitClient);
+ }
+
+ [TearDown]
+ public override void EndTest()
+ {
+ CacheHelper.StopJavaServers();
+ base.EndTest();
+ }
+
+
+ public void InitClient()
+ {
+ CacheHelper.Init();
+ try
+ {
+ Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache);
+ Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache);
+ }
+ catch (IllegalStateException)
+ {
+ // ignore since we run multiple iterations for pool and non pool configs
+ }
+ }
+ public void StepOne(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true,
+ null, locators, "__TESTPOOL1_", true);
+ CacheHelper.CreateTCRegion_Pool<object, object>("DistRegionAck", true, true,
+ null, locators, "__TESTPOOL1_", true);
+ IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes;
+ region.CreateSubRegion(QueryRegionNames[1], regattrs);
+ }
+
+ public void StepTwo(bool usePdxObject)
+ {
+ IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]);
+ IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]);
+ IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]);
+ IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]);
+
+ QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache);
+ Util.Log("Object type is pdx = " + m_usePdxObjects);
+
+ Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+
+ if (!usePdxObject)
+ {
+ qh.PopulatePortfolioData(region0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePositionData(region1, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioData(region2, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioData(region3, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ }
+ else
+ {
+ Serializable.RegisterPdxType(PortfolioPdx.CreateDeserializable);
+ Serializable.RegisterPdxType(PositionPdx.CreateDeserializable);
+ qh.PopulatePortfolioPdxData(region0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioPdxData(subRegion0, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioPdxData(region1, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioPdxData(region2, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ qh.PopulatePortfolioPdxData(region3, qh.PortfolioSetSize,
+ qh.PortfolioNumSets);
+ }
+ }
+
+ public void StepTwoQT()
+ {
+ IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]);
+ IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]);
+
+ QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache);
+
+ qh.PopulatePortfolioData(region0, 100, 20, 100);
+ qh.PopulatePositionData(subRegion0, 100, 20);
+ }
+
+ public void StepOneQE(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
+ null, locators, "__TESTPOOL1_", true);
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
+ Portfolio p1 = new Portfolio(1, 100);
+ Portfolio p2 = new Portfolio(2, 100);
+ Portfolio p3 = new Portfolio(3, 100);
+ Portfolio p4 = new Portfolio(4, 100);
+
+ region["1"] = p1;
+ region["2"] = p2;
+ region["3"] = p3;
+ region["4"] = p4;
+
+ QueryService<object, object> qs = null;
+
+ qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
+ cqFac.AddCqListener(cqLstner);
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false);
+ qry.Execute();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry = qs.GetCq(CqName);
+
+ CqServiceStatistics cqSvcStats = qs.GetCqStatistics();
+ Assert.AreEqual(1, cqSvcStats.numCqsActive());
+ Assert.AreEqual(1, cqSvcStats.numCqsCreated());
+ Assert.AreEqual(1, cqSvcStats.numCqsOnClient());
+
+ cqAttr = qry.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ Assert.IsNotNull(vl);
+ Assert.AreEqual(1, vl.Length);
+ cqLstner = vl[0];
+ Assert.IsNotNull(cqLstner);
+ MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore());
+
+ CqStatistics cqStats = qry.GetStatistics();
+ Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore());
+ if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0)
+ {
+ Assert.Fail("cq before count zero");
+ }
+ qry.Stop();
+ Assert.AreEqual(1, cqSvcStats.numCqsStopped());
+ qry.Close();
+ Assert.AreEqual(1, cqSvcStats.numCqsClosed());
+ // Bring down the region
+ region.GetLocalView().DestroyRegion();
+ }
+
+ public void StepOnePdxQE(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
+ null, locators, "__TESTPOOL1_", true);
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
+ PortfolioPdx p1 = new PortfolioPdx(1, 100);
+ PortfolioPdx p2 = new PortfolioPdx(2, 100);
+ PortfolioPdx p3 = new PortfolioPdx(3, 100);
+ PortfolioPdx p4 = new PortfolioPdx(4, 100);
+
+ region["1"] = p1;
+ region["2"] = p2;
+ region["3"] = p3;
+ region["4"] = p4;
+
+ QueryService<object, object> qs = null;
+
+ qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
+ cqFac.AddCqListener(cqLstner);
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false);
+ qry.Execute();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry = qs.GetCq(CqName);
+
+ CqServiceStatistics cqSvcStats = qs.GetCqStatistics();
+ Assert.AreEqual(1, cqSvcStats.numCqsActive());
+ Assert.AreEqual(1, cqSvcStats.numCqsCreated());
+ Assert.AreEqual(1, cqSvcStats.numCqsOnClient());
+
+ cqAttr = qry.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ Assert.IsNotNull(vl);
+ Assert.AreEqual(1, vl.Length);
+ cqLstner = vl[0];
+ Assert.IsNotNull(cqLstner);
+ MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore());
+
+ CqStatistics cqStats = qry.GetStatistics();
+ Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore());
+ if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0)
+ {
+ Assert.Fail("cq before count zero");
+ }
+ qry.Stop();
+ Assert.AreEqual(1, cqSvcStats.numCqsStopped());
+ qry.Close();
+ Assert.AreEqual(1, cqSvcStats.numCqsClosed());
+ // Bring down the region
+ region.GetLocalView().DestroyRegion();
+ }
+ public void KillServer()
+ {
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ }
+
+ public delegate void KillServerDelegate();
+
+ /*
+ public void StepOneFailover()
+ {
+ // This is here so that Client1 registers information of the cacheserver
+ // that has been already started
+ CacheHelper.SetupJavaServers("remotequery.xml",
+ "cqqueryfailover.xml");
+ CacheHelper.StartJavaServer(1, "GFECS1");
+ Util.Log("Cacheserver 1 started.");
+
+ CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true);
+
+ Region region = CacheHelper.GetVerifyRegion(QueryRegionNames[0]);
+ Portfolio p1 = new Portfolio(1, 100);
+ Portfolio p2 = new Portfolio(2, 200);
+ Portfolio p3 = new Portfolio(3, 300);
+ Portfolio p4 = new Portfolio(4, 400);
+
+ region.Put("1", p1);
+ region.Put("2", p2);
+ region.Put("3", p3);
+ region.Put("4", p4);
+ }
+ */
+ /*
+ public void StepTwoFailover()
+ {
+ CacheHelper.StartJavaServer(2, "GFECS2");
+ Util.Log("Cacheserver 2 started.");
+
+ IAsyncResult killRes = null;
+ KillServerDelegate ksd = new KillServerDelegate(KillServer);
+ CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true);
+
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QueryRegionNames[0]);
+
+ QueryService qs = CacheHelper.DCache.GetQueryService();
+ CqAttributesFactory cqFac = new CqAttributesFactory();
+ ICqListener cqLstner = new MyCqListener();
+ cqFac.AddCqListener(cqLstner);
+ CqAttributes cqAttr = cqFac.Create();
+ CqQuery qry = qs.NewCq(CqName1, "select * from /" + QERegionName + " p where p.ID!<4", cqAttr, true);
+ qry.Execute();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ qry = qs.GetCq(CqName1);
+ cqAttr = qry.GetCqAttributes();
+ ICqListener[] vl = cqAttr.getCqListeners();
+ Assert.IsNotNull(vl);
+ Assert.AreEqual(1, vl.Length);
+ cqLstner = vl[0];
+ Assert.IsNotNull(cqLstner);
+ MyCqListener myLisner = cqLstner as MyCqListener;
+ if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() != 0)
+ {
+ Assert.Fail("cq after count not zero");
+ }
+
+ killRes = ksd.BeginInvoke(null, null);
+ Thread.Sleep(18000); // sleep 0.3min to allow failover complete
+ myLisner.failedOver();
+
+ Portfolio p1 = new Portfolio(1, 100);
+ Portfolio p2 = new Portfolio(2, 200);
+ Portfolio p3 = new Portfolio(3, 300);
+ Portfolio p4 = new Portfolio(4, 400);
+
+ region.Put("4", p1);
+ region.Put("3", p2);
+ region.Put("2", p3);
+ region.Put("1", p4);
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry = qs.GetCq(CqName1);
+ cqAttr = qry.GetCqAttributes();
+ vl = cqAttr.getCqListeners();
+ cqLstner = vl[0];
+ Assert.IsNotNull(vl);
+ Assert.AreEqual(1, vl.Length);
+ cqLstner = vl[0];
+ Assert.IsNotNull(cqLstner);
+ myLisner = cqLstner as MyCqListener;
+ if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() == 0)
+ {
+ Assert.Fail("no cq after failover");
+ }
+
+ killRes.AsyncWaitHandle.WaitOne();
+ ksd.EndInvoke(killRes);
+ qry.Stop();
+ qry.Close();
+ }
+ */
+
+ public void ProcessCQ(string locators)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true,
+ null, locators, "__TESTPOOL1_", true);
+
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName);
+ Portfolio p1 = new Portfolio(1, 100);
+ Portfolio p2 = new Portfolio(2, 100);
+ Portfolio p3 = new Portfolio(3, 100);
+ Portfolio p4 = new Portfolio(4, 100);
+
+ region["1"] = p1;
+ region["2"] = p2;
+ region["3"] = p3;
+ region["4"] = p4;
+
+ QueryService<object, object> qs = null;
+
+ qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>();
+
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ ICqListener<object, object> cqLstner = new MyCqListener<object, object>();
+ ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1);
+
+ ICqListener<object, object>[] v = new ICqListener<object, object>[2];
+ cqFac.AddCqListener(cqLstner);
+ v[0] = cqLstner;
+ v[1] = cqStatusLstner;
+ cqFac.InitCqListeners(v);
+ Util.Log("InitCqListeners called");
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + " p where p.ID >= 1", cqAttr, false);
+ qry1.Execute();
+
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry1 = qs.GetCq("CQ1");
+ cqAttr = qry1.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ Assert.IsNotNull(vl);
+ Assert.AreEqual(2, vl.Length);
+ cqLstner = vl[0];
+ Assert.IsNotNull(cqLstner);
+ MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore());
+ Assert.AreEqual(4, myLisner.getEventCountBefore());
+
+ cqStatusLstner = (ICqStatusListener<object, object>)vl[1];
+ Assert.IsNotNull(cqStatusLstner);
+ MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore());
+ Assert.AreEqual(1, myStatLisner.getCqConnectedCount());
+ Assert.AreEqual(4, myStatLisner.getEventCountBefore());
+
+ CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator();
+ mutator.RemoveCqListener(cqLstner);
+ cqAttr = qry1.GetCqAttributes();
+ Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
+ Assert.AreEqual(1, cqAttr.getCqListeners().Length);
+
+ mutator.RemoveCqListener(cqStatusLstner);
+ cqAttr = qry1.GetCqAttributes();
+ Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
+ Assert.AreEqual(0, cqAttr.getCqListeners().Length);
+
+ ICqListener<object, object>[] v2 = new ICqListener<object, object>[2];
+ v2[0] = cqLstner;
+ v2[1] = cqStatusLstner;
+ MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner;
+ myLisner2.Clear();
+ MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;
+ myStatLisner2.Clear();
+ mutator.SetCqListeners(v2);
+ cqAttr = qry1.GetCqAttributes();
+ Assert.AreEqual(2, cqAttr.getCqListeners().Length);
+
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry1 = qs.GetCq("CQ1");
+ cqAttr = qry1.GetCqAttributes();
+ ICqListener<object, object>[] v3 = cqAttr.getCqListeners();
+ Assert.IsNotNull(v3);
+ Assert.AreEqual(2, vl.Length);
+ cqLstner = v3[0];
+ Assert.IsNotNull(cqLstner);
+ myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore());
+ Assert.AreEqual(4, myLisner2.getEventCountBefore());
+
+ cqStatusLstner = (ICqStatusListener<object, object>)v3[1];
+ Assert.IsNotNull(cqStatusLstner);
+ myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>;
+ Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore());
+ Assert.AreEqual(0, myStatLisner2.getCqConnectedCount());
+ Assert.AreEqual(4, myStatLisner2.getEventCountBefore());
+
+ mutator = qry1.GetCqAttributesMutator();
+ mutator.RemoveCqListener(cqLstner);
+ cqAttr = qry1.GetCqAttributes();
+ Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
+ Assert.AreEqual(1, cqAttr.getCqListeners().Length);
+
+ mutator.RemoveCqListener(cqStatusLstner);
+ cqAttr = qry1.GetCqAttributes();
+ Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length);
+ Assert.AreEqual(0, cqAttr.getCqListeners().Length);
+
+ region["4"] = p1;
+ region["3"] = p2;
+ region["2"] = p3;
+ region["1"] = p4;
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+
+ qry1 = qs.GetCq("CQ1");
+ cqAttr = qry1.GetCqAttributes();
+ ICqListener<object, object>[] v4 = cqAttr.getCqListeners();
+ Assert.IsNotNull(v4);
+ Assert.AreEqual(0, v4.Length);
+ Util.Log("cqAttr.getCqListeners() done");
+ }
+
+ public void CreateAndExecuteCQ_StatusListener(string poolName, string cqName, string cqQuery, int id)
+ {
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ cqFac.AddCqListener(new MyCqStatusListener<object, object>(id));
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false);
+ qry.Execute();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ }
+
+ public void CreateAndExecuteCQ_Listener(string poolName, string cqName, string cqQuery, int id)
+ {
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>();
+ CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>();
+ cqFac.AddCqListener(new MyCqListener<object, object>(/*id*/));
+ CqAttributes<object, object> cqAttr = cqFac.Create();
+ CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false);
+ qry.Execute();
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ }
+
+ public void CheckCQStatusOnConnect(string poolName, string cqName, int onCqStatusConnect)
+ {
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>();
+ CqQuery<object, object> query = qs.GetCq(cqName);
+ CqAttributes<object, object> cqAttr = query.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>) vl[0];
+ Util.Log("CheckCQStatusOnConnect = {0} ", myCqStatusLstr.getCqConnectedCount());
+ Assert.AreEqual(onCqStatusConnect, myCqStatusLstr.getCqConnectedCount());
+ }
+
+ public void CheckCQStatusOnDisConnect(string poolName, string cqName, int onCqStatusDisConnect)
+ {
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>();
+ CqQuery<object, object> query = qs.GetCq(cqName);
+ CqAttributes<object, object> cqAttr = query.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0];
+ Util.Log("CheckCQStatusOnDisConnect = {0} ", myCqStatusLstr.getCqDisConnectedCount());
+ Assert.AreEqual(onCqStatusDisConnect, myCqStatusLstr.getCqDisConnectedCount());
+ }
+
+ public void PutEntries(string regionName)
+ {
+ IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(regionName);
+ for (int i = 1; i <= 10; i++) {
+ region["key-" + i] = "val-" + i;
+ }
+ Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete
+ }
+
+ public void CheckCQStatusOnPutEvent(string poolName, string cqName, int onCreateCount)
+ {
+ QueryService<object, object> qs = null;
+ qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>();
+ CqQuery<object, object> query = qs.GetCq(cqName);
+ CqAttributes<object, object> cqAttr = query.GetCqAttributes();
+ ICqListener<object, object>[] vl = cqAttr.getCqListeners();
+ MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0];
+ Util.Log("CheckCQStatusOnPutEvent = {0} ", myCqStatusLstr.getEventCountBefore());
+ Assert.AreEqual(onCreateCount, myCqStatusLstr.getEventCountBefore());
+ }
+
+ public void CreateRegion(string locators, string servergroup, string regionName, string poolName)
+ {
+ CacheHelper.CreateTCRegion_Pool<object, object>(regionName, true, true,
+ null, locators, poolName, true, servergroup);
+ }
+
+ void runCqQueryTest()
+ {
+ CacheHelper.SetupJavaServers(true, "remotequeryN.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_client1.Call(StepOne, CacheHelper.Locators);
+ Util.Log("StepOne complete.");
+
+ m_client1.Call(StepTwo, m_usePdxObjects);
+ Util.Log("StepTwo complete.");
+
+ if (!m_usePdxObjects)
+ m_client1.Call(StepOneQE, CacheHelper.Locators);
+ else
+ m_client1.Call(StepOnePdxQE, CacheHelper.Locators);
+ Util.Log("StepOne complete.");
+
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+ }
+
+ void runCqQueryStatusTest()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_client1.Call(StepOne, CacheHelper.Locators);
+ Util.Log("StepOne complete.");
+
+ m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL1_", CqName1, CqQuery1, 100);
+ Util.Log("CreateAndExecuteCQ complete.");
+
+ m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 1);
+ Util.Log("CheckCQStatusOnConnect complete.");
+
+ m_client1.Call(PutEntries, "DistRegionAck");
+ Util.Log("PutEntries complete.");
+
+ m_client1.Call(CheckCQStatusOnPutEvent, "__TESTPOOL1_", CqName1, 10);
+ Util.Log("CheckCQStatusOnPutEvent complete.");
+
+ CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml");
+ CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1);
+ Util.Log("start server 2 complete.");
+
+ Thread.Sleep(20000);
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ Thread.Sleep(20000);
+ m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 0);
+ Util.Log("CheckCQStatusOnDisConnect complete.");
+
+ CacheHelper.StopJavaServer(2);
+ Util.Log("Cacheserver 2 stopped.");
+ Thread.Sleep(20000);
+ m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 1);
+ Util.Log("CheckCQStatusOnDisConnect complete.");
+
+ CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+ Thread.Sleep(20000);
+
+ m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 2);
+ Util.Log("CheckCQStatusOnConnect complete.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ Thread.Sleep(20000);
+
+ m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 2);
+ Util.Log("CheckCQStatusOnDisConnect complete.");
+
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+ }
+
+ void runCqQueryStatusTest2()
+ {
+ CacheHelper.SetupJavaServers(true, "cacheserver_servergroup.xml", "cacheserver_servergroup2.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("start server 1 complete.");
+ CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1);
+ Util.Log("start server 2 complete.");
+
+ m_client1.Call(CreateRegion, CacheHelper.Locators, "group1", "DistRegionAck", "__TESTPOOL1_");
+ Util.Log("CreateRegion DistRegionAck complete.");
+
+ m_client1.Call(CreateRegion, CacheHelper.Locators, "group2", "DistRegionAck1", "__TESTPOOL2_");
+ Util.Log("CreateRegion DistRegionAck1 complete.");
+
+ m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL1_", CqName1, CqQuery1, 100);
+ Util.Log("CreateAndExecuteCQ1 complete.");
+
+ m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL2_", CqName2, CqQuery2, 101);
+ Util.Log("CreateAndExecuteCQ2 complete.");
+
+ m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 1);
+ Util.Log("CheckCQStatusOnConnect1 complete.");
+
+ m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL2_", CqName2, 1);
+ Util.Log("CheckCQStatusOnConnect2 complete.");
+
+ m_client1.Call(PutEntries, "DistRegionAck");
+ Util.Log("PutEntries1 complete.");
+
+ m_client1.Call(PutEntries, "DistRegionAck1");
+ Util.Log("PutEntries2 complete.");
+
+ m_client1.Call(CheckCQStatusOnPutEvent, "__TESTPOOL1_", CqName1, 10);
+ Util.Log("CheckCQStatusOnPutEvent complete.");
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+ Thread.Sleep(20000);
+
+ m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 1);
+ Util.Log("CheckCQStatusOnDisConnect complete.");
+
+ CacheHelper.StopJavaServer(2);
+ Util.Log("Cacheserver 2 stopped.");
+ Thread.Sleep(20000);
+
+ m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL2_", CqName2, 1);
+ Util.Log("CheckCQStatusOnDisConnect complete.");
+
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+ }
+
+ void runCqQueryStatusTest3()
+ {
+ CacheHelper.SetupJavaServers(true, "remotequeryN.xml");
+ CacheHelper.StartJavaLocator(1, "GFELOC");
+ Util.Log("Locator started");
+ CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1);
+ Util.Log("Cacheserver 1 started.");
+
+ m_client1.Call(ProcessCQ, CacheHelper.Locators);
+ Util.Log("ProcessCQ complete.");
+
+ m_client1.Call(Close);
+
+ CacheHelper.StopJavaServer(1);
+ Util.Log("Cacheserver 1 stopped.");
+
+ CacheHelper.StopJavaLocator(1);
+ Util.Log("Locator stopped");
+ }
+
+ [Test]
+ public void CqQueryTest()
+ {
+ runCqQueryTest();
+ }
+
+ [Test]
+ public void CqQueryPdxTest()
+ {
+ m_usePdxObjects = true;
+ runCqQueryTest();
+ m_usePdxObjects = false;
+ }
+
+ // [Test]
+ // public void CqFailover()
+ // {
+ // try
+ // {
+ // m_client1.Call(StepOneFailover);
+ // Util.Log("StepOneFailover complete.");
+ //
+ // m_client1.Call(StepTwoFailover);
+ // Util.Log("StepTwoFailover complete.");
+ // }
+ // finally
+ // {
+ // m_client1.Call(CacheHelper.StopJavaServers);
+ // }
+ // }
+
+ [Test]
+ public void CqQueryStatusTest()
+ {
+ runCqQueryStatusTest();
+ }
+
+ [Test]
+ public void CqQueryStatusTest2()
+ {
+ runCqQueryStatusTest2();
+ }
+
+ [Test]
+ public void CqQueryStatusTest3()
+ {
+ runCqQueryStatusTest3();
+ }
+
+ }
+}