You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:29 UTC
[41/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f39e2394
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f39e2394
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f39e2394
Branch: refs/heads/feature/GEODE-37_2
Commit: f39e2394ba9af9b5ceb0268281dcc339ecb0b1ef
Parents: 701c686
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 13 15:43:20 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 13 15:43:20 2016 -0700
----------------------------------------------------------------------
.../cache/CacheXml70GatewayDUnitTest.java | 255 --
.../cache/CacheXml80GatewayDUnitTest.java | 150 -
.../AnalyzeWANSerializablesJUnitTest.java | 90 -
.../internal/cache/UpdateVersionDUnitTest.java | 962 -----
.../cache/wan/CacheClientNotifierDUnitTest.java | 276 --
.../cache/wan/Simple2CacheServerDUnitTest.java | 185 -
.../gemfire/internal/cache/wan/WANTestBase.java | 3765 ------------------
...oncurrentParallelGatewaySenderDUnitTest.java | 737 ----
...ntParallelGatewaySenderOffHeapDUnitTest.java | 42 -
...allelGatewaySenderOperation_1_DUnitTest.java | 796 ----
...allelGatewaySenderOperation_2_DUnitTest.java | 625 ---
...tSerialGatewaySenderOperationsDUnitTest.java | 120 -
...GatewaySenderOperationsOffHeapDUnitTest.java | 42 -
.../ConcurrentWANPropagation_1_DUnitTest.java | 568 ---
.../ConcurrentWANPropagation_2_DUnitTest.java | 448 ---
.../cache/wan/disttx/DistTXWANDUnitTest.java | 182 -
.../CommonParallelGatewaySenderDUnitTest.java | 460 ---
...onParallelGatewaySenderOffHeapDUnitTest.java | 42 -
...wWANConcurrencyCheckForDestroyDUnitTest.java | 532 ---
.../wan/misc/NewWanAuthenticationDUnitTest.java | 469 ---
.../cache/wan/misc/PDXNewWanDUnitTest.java | 767 ----
...dRegion_ParallelWANPersistenceDUnitTest.java | 670 ----
...dRegion_ParallelWANPropagationDUnitTest.java | 1063 -----
.../SenderWithTransportFilterDUnitTest.java | 228 --
...downAllPersistentGatewaySenderDUnitTest.java | 206 -
.../wan/misc/WANConfigurationJUnitTest.java | 601 ---
.../wan/misc/WANLocatorServerDUnitTest.java | 192 -
.../cache/wan/misc/WANSSLDUnitTest.java | 160 -
.../wan/misc/WanAutoDiscoveryDUnitTest.java | 561 ---
.../cache/wan/misc/WanValidationsDUnitTest.java | 1507 -------
...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 -
...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 -
...GatewaySenderOperationsOffHeapDUnitTest.java | 44 -
...ewaySenderQueueOverflowOffHeapDUnitTest.java | 44 -
.../ParallelWANConflationOffHeapDUnitTest.java | 44 -
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 -
...ropagationConcurrentOpsOffHeapDUnitTest.java | 44 -
.../ParallelWANPropagationOffHeapDUnitTest.java | 43 -
...erialGatewaySenderQueueOffHeapDUnitTest.java | 44 -
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 -
.../SerialWANPropagationOffHeapDUnitTest.java | 38 -
...ation_PartitionedRegionOffHeapDUnitTest.java | 39 -
...allelGatewaySenderOperation_2_DUnitTest.java | 48 -
...arallelGatewaySenderOperationsDUnitTest.java | 692 ----
...llelGatewaySenderQueueOverflowDUnitTest.java | 500 ---
.../ParallelWANConflationDUnitTest.java | 497 ---
...ersistenceEnabledGatewaySenderDUnitTest.java | 1593 --------
...llelWANPropagationClientServerDUnitTest.java | 97 -
...lelWANPropagationConcurrentOpsDUnitTest.java | 285 --
.../ParallelWANPropagationDUnitTest.java | 1234 ------
...ParallelWANPropagationLoopBackDUnitTest.java | 415 --
.../wan/parallel/ParallelWANStatsDUnitTest.java | 499 ---
...tewaySenderDistributedDeadlockDUnitTest.java | 405 --
...rialGatewaySenderEventListenerDUnitTest.java | 383 --
.../SerialGatewaySenderOperationsDUnitTest.java | 665 ----
.../SerialGatewaySenderQueueDUnitTest.java | 327 --
...ersistenceEnabledGatewaySenderDUnitTest.java | 547 ---
.../serial/SerialWANPropagationDUnitTest.java | 1336 -------
.../SerialWANPropagationLoopBackDUnitTest.java | 513 ---
...NPropagation_PartitionedRegionDUnitTest.java | 412 --
.../SerialWANPropagationsFeatureDUnitTest.java | 338 --
.../wan/serial/SerialWANStatsDUnitTest.java | 588 ---
.../wan/wancommand/WANCommandTestBase.java | 490 ---
...anCommandCreateGatewayReceiverDUnitTest.java | 630 ---
.../WanCommandCreateGatewaySenderDUnitTest.java | 706 ----
...WanCommandGatewayReceiverStartDUnitTest.java | 276 --
.../WanCommandGatewayReceiverStopDUnitTest.java | 281 --
.../WanCommandGatewaySenderStartDUnitTest.java | 400 --
.../WanCommandGatewaySenderStopDUnitTest.java | 352 --
.../wan/wancommand/WanCommandListDUnitTest.java | 381 --
.../WanCommandPauseResumeDUnitTest.java | 688 ----
.../wancommand/WanCommandStatusDUnitTest.java | 546 ---
.../management/WANManagementDUnitTest.java | 513 ---
.../ClusterConfigurationDUnitTest.java | 1013 -----
.../pulse/TestRemoteClusterDUnitTest.java | 272 --
.../geode/cache/CacheXml70GatewayDUnitTest.java | 255 ++
.../geode/cache/CacheXml80GatewayDUnitTest.java | 150 +
.../AnalyzeWANSerializablesJUnitTest.java | 90 +
.../internal/cache/UpdateVersionDUnitTest.java | 962 +++++
.../cache/wan/CacheClientNotifierDUnitTest.java | 276 ++
.../cache/wan/Simple2CacheServerDUnitTest.java | 185 +
.../geode/internal/cache/wan/WANTestBase.java | 3765 ++++++++++++++++++
...oncurrentParallelGatewaySenderDUnitTest.java | 737 ++++
...ntParallelGatewaySenderOffHeapDUnitTest.java | 42 +
...allelGatewaySenderOperation_1_DUnitTest.java | 796 ++++
...allelGatewaySenderOperation_2_DUnitTest.java | 625 +++
...tSerialGatewaySenderOperationsDUnitTest.java | 120 +
...GatewaySenderOperationsOffHeapDUnitTest.java | 42 +
.../ConcurrentWANPropagation_1_DUnitTest.java | 568 +++
.../ConcurrentWANPropagation_2_DUnitTest.java | 448 +++
.../cache/wan/disttx/DistTXWANDUnitTest.java | 182 +
.../CommonParallelGatewaySenderDUnitTest.java | 460 +++
...onParallelGatewaySenderOffHeapDUnitTest.java | 42 +
...wWANConcurrencyCheckForDestroyDUnitTest.java | 532 +++
.../wan/misc/NewWanAuthenticationDUnitTest.java | 469 +++
.../cache/wan/misc/PDXNewWanDUnitTest.java | 767 ++++
...dRegion_ParallelWANPersistenceDUnitTest.java | 670 ++++
...dRegion_ParallelWANPropagationDUnitTest.java | 1063 +++++
.../SenderWithTransportFilterDUnitTest.java | 228 ++
...downAllPersistentGatewaySenderDUnitTest.java | 206 +
.../wan/misc/WANConfigurationJUnitTest.java | 601 +++
.../wan/misc/WANLocatorServerDUnitTest.java | 192 +
.../cache/wan/misc/WANSSLDUnitTest.java | 160 +
.../wan/misc/WanAutoDiscoveryDUnitTest.java | 561 +++
.../cache/wan/misc/WanValidationsDUnitTest.java | 1507 +++++++
...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 +
...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 +
...GatewaySenderOperationsOffHeapDUnitTest.java | 44 +
...ewaySenderQueueOverflowOffHeapDUnitTest.java | 44 +
.../ParallelWANConflationOffHeapDUnitTest.java | 44 +
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 +
...ropagationConcurrentOpsOffHeapDUnitTest.java | 44 +
.../ParallelWANPropagationOffHeapDUnitTest.java | 43 +
...erialGatewaySenderQueueOffHeapDUnitTest.java | 44 +
...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 +
.../SerialWANPropagationOffHeapDUnitTest.java | 38 +
...ation_PartitionedRegionOffHeapDUnitTest.java | 39 +
...allelGatewaySenderOperation_2_DUnitTest.java | 48 +
...arallelGatewaySenderOperationsDUnitTest.java | 692 ++++
...llelGatewaySenderQueueOverflowDUnitTest.java | 500 +++
.../ParallelWANConflationDUnitTest.java | 497 +++
...ersistenceEnabledGatewaySenderDUnitTest.java | 1593 ++++++++
...llelWANPropagationClientServerDUnitTest.java | 97 +
...lelWANPropagationConcurrentOpsDUnitTest.java | 285 ++
.../ParallelWANPropagationDUnitTest.java | 1234 ++++++
...ParallelWANPropagationLoopBackDUnitTest.java | 415 ++
.../wan/parallel/ParallelWANStatsDUnitTest.java | 499 +++
...tewaySenderDistributedDeadlockDUnitTest.java | 405 ++
...rialGatewaySenderEventListenerDUnitTest.java | 383 ++
.../SerialGatewaySenderOperationsDUnitTest.java | 665 ++++
.../SerialGatewaySenderQueueDUnitTest.java | 327 ++
...ersistenceEnabledGatewaySenderDUnitTest.java | 547 +++
.../serial/SerialWANPropagationDUnitTest.java | 1336 +++++++
.../SerialWANPropagationLoopBackDUnitTest.java | 513 +++
...NPropagation_PartitionedRegionDUnitTest.java | 412 ++
.../SerialWANPropagationsFeatureDUnitTest.java | 338 ++
.../wan/serial/SerialWANStatsDUnitTest.java | 588 +++
.../wan/wancommand/WANCommandTestBase.java | 490 +++
...anCommandCreateGatewayReceiverDUnitTest.java | 630 +++
.../WanCommandCreateGatewaySenderDUnitTest.java | 706 ++++
...WanCommandGatewayReceiverStartDUnitTest.java | 276 ++
.../WanCommandGatewayReceiverStopDUnitTest.java | 281 ++
.../WanCommandGatewaySenderStartDUnitTest.java | 400 ++
.../WanCommandGatewaySenderStopDUnitTest.java | 352 ++
.../wan/wancommand/WanCommandListDUnitTest.java | 381 ++
.../WanCommandPauseResumeDUnitTest.java | 688 ++++
.../wancommand/WanCommandStatusDUnitTest.java | 546 +++
.../management/WANManagementDUnitTest.java | 513 +++
.../ClusterConfigurationDUnitTest.java | 1013 +++++
.../pulse/TestRemoteClusterDUnitTest.java | 272 ++
150 files changed, 35135 insertions(+), 35135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
deleted file mode 100644
index 3014b1b..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.cache30.CacheXml70DUnitTest;
-import com.gemstone.gemfire.cache30.CacheXmlTestCase;
-import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
-import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
-
-@Category(DistributedTest.class)
-public class CacheXml70GatewayDUnitTest extends CacheXmlTestCase {
-
- public CacheXml70GatewayDUnitTest() {
- super();
- }
-
- protected String getGemFireVersion() {
- return CacheXml.VERSION_7_0;
- }
-
- /**
- * Added to test the scenario of defect #50600.
- */
- @Test
- public void testAsyncEventQueueWithGatewayEventFilter() {
- getSystem();
- CacheCreation cache = new CacheCreation();
-
- String id = "WBCLChannel";
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(100);
- factory.setBatchTimeInterval(500);
- factory.setBatchConflationEnabled(true);
- factory.setMaximumQueueMemory(200);
- factory.setDiskSynchronous(true);
- factory.setParallel(false);
- factory.setDispatcherThreads(33);
- factory.addGatewayEventFilter(new MyGatewayEventFilter());
-
- AsyncEventListener eventListener = new CacheXml70DUnitTest.MyAsyncEventListener();
- AsyncEventQueue asyncEventQueue = factory.create(id, eventListener);
-
- RegionAttributesCreation attrs = new RegionAttributesCreation();
- attrs.addAsyncEventQueueId(asyncEventQueue.getId());
- cache.createRegion("UserRegion", attrs);
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
-
- Set<AsyncEventQueue> asyncEventQueuesOnCache = c.getAsyncEventQueues();
- assertTrue("Size of asyncEventQueues should be greater than 0", asyncEventQueuesOnCache.size() > 0);
-
- for (AsyncEventQueue asyncEventQueueOnCache : asyncEventQueuesOnCache) {
- CacheXml70DUnitTest.validateAsyncEventQueue(asyncEventQueue, asyncEventQueueOnCache);
- }
- }
-
- @Test
- public void testGatewayReceiver() throws Exception{
- getSystem();
- CacheCreation cache = new CacheCreation();
-
- GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
- gatewayReceiverFactory.setBindAddress("");
- gatewayReceiverFactory.setStartPort(20000);
- gatewayReceiverFactory.setEndPort(29999);
- gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
- gatewayReceiverFactory.setSocketBufferSize(1500);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter2);
- GatewayReceiver receiver1 = gatewayReceiverFactory.create();
-
- receiver1.start();
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
- Set<GatewayReceiver> receivers = c.getGatewayReceivers();
- for(GatewayReceiver receiver : receivers){
- validateGatewayReceiver(receiver1, receiver);
- }
- }
-
- @Test
- public void testParallelGatewaySender() throws CacheException{
- getSystem();
- CacheCreation cache = new CacheCreation();
-
- GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
- gatewaySenderFactory.setParallel(true);
- gatewaySenderFactory.setDispatcherThreads(13);
- gatewaySenderFactory.setManualStart(true);
- gatewaySenderFactory.setSocketBufferSize(1234);
- gatewaySenderFactory.setSocketReadTimeout(1050);
- gatewaySenderFactory.setBatchConflationEnabled(false);
- gatewaySenderFactory.setBatchSize(88);
- gatewaySenderFactory.setBatchTimeInterval(9);
- gatewaySenderFactory.setPersistenceEnabled(true);
- gatewaySenderFactory.setDiskStoreName("LNSender");
- gatewaySenderFactory.setDiskSynchronous(true);
- gatewaySenderFactory.setMaximumQueueMemory(211);
- gatewaySenderFactory.setAlertThreshold(35);
-
- GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
- gatewaySenderFactory.addGatewayEventFilter(myEventFilter1);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2);
- GatewaySender parallelGatewaySender = gatewaySenderFactory.create("LN", 2);
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
- Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
- for(GatewaySender sender : sendersOnCache){
- assertEquals(true, sender.isParallel());
- validateGatewaySender(parallelGatewaySender, sender);
- }
- }
-
- @Test
- public void testSerialGatewaySender() throws CacheException{
- getSystem();
- CacheCreation cache = new CacheCreation();
- GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
- gatewaySenderFactory.setParallel(false);
- gatewaySenderFactory.setManualStart(true);
- gatewaySenderFactory.setSocketBufferSize(124);
- gatewaySenderFactory.setSocketReadTimeout(1000);
- gatewaySenderFactory.setBatchConflationEnabled(false);
- gatewaySenderFactory.setBatchSize(100);
- gatewaySenderFactory.setBatchTimeInterval(10);
- gatewaySenderFactory.setPersistenceEnabled(true);
- gatewaySenderFactory.setDiskStoreName("LNSender");
- gatewaySenderFactory.setDiskSynchronous(true);
- gatewaySenderFactory.setMaximumQueueMemory(200);
- gatewaySenderFactory.setAlertThreshold(30);
-
- GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
- gatewaySenderFactory.addGatewayEventFilter(myEventFilter1);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2);
- GatewaySender serialGatewaySender = gatewaySenderFactory.create("LN", 2);
-
- RegionAttributesCreation attrs = new RegionAttributesCreation();
- attrs.addGatewaySenderId(serialGatewaySender.getId());
- cache.createRegion("UserRegion", attrs);
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
- Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
- for(GatewaySender sender : sendersOnCache){
- assertEquals(false, sender.isParallel());
- validateGatewaySender(serialGatewaySender, sender);
- }
- }
-
- public static class MyGatewayEventFilter implements GatewayEventFilter, Declarable {
- public void afterAcknowledgement(GatewayQueueEvent event) {
- }
- public boolean beforeEnqueue(GatewayQueueEvent event) {
- return true;
- }
- public boolean beforeTransmit(GatewayQueueEvent event) {
- return true;
- }
- public void close() {
- }
- public void init(Properties properties) {
- }
- }
-
- static void validateGatewayReceiver(GatewayReceiver receiver1, GatewayReceiver gatewayReceiver) {
- assertEquals(receiver1.getHost(), gatewayReceiver.getHost());
- assertEquals(receiver1.getStartPort(), gatewayReceiver.getStartPort());
- assertEquals(receiver1.getEndPort(), gatewayReceiver.getEndPort());
- assertEquals(receiver1.getMaximumTimeBetweenPings(), gatewayReceiver.getMaximumTimeBetweenPings());
- assertEquals(receiver1.getSocketBufferSize(), gatewayReceiver.getSocketBufferSize());
- assertEquals(receiver1.getGatewayTransportFilters().size(), gatewayReceiver.getGatewayTransportFilters().size());
- }
-
- static void validateGatewaySender(GatewaySender sender1, GatewaySender gatewaySender) {
- assertEquals(sender1.getId(), gatewaySender.getId());
- assertEquals(sender1.getRemoteDSId(), gatewaySender.getRemoteDSId());
- assertEquals(sender1.isParallel(), gatewaySender.isParallel());
- assertEquals(sender1.isBatchConflationEnabled(), gatewaySender.isBatchConflationEnabled());
- assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
- assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
- assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
- assertEquals(sender1.getDiskStoreName(),gatewaySender.getDiskStoreName());
- assertEquals(sender1.isDiskSynchronous(),gatewaySender.isDiskSynchronous());
- assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
- assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
- assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender.getGatewayEventFilters().size());
- assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender.getGatewayTransportFilters().size());
-
- boolean isParallel = sender1.isParallel();
- if (isParallel) {
- assertTrue("sender should be instanceof Creation", sender1 instanceof ParallelGatewaySenderCreation);
- } else {
- assertTrue("sender should be instanceof Creation", sender1 instanceof SerialGatewaySenderCreation);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
deleted file mode 100644
index c140ebc..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.Set;
-
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
-import com.gemstone.gemfire.cache.wan.*;
-import com.gemstone.gemfire.cache30.*;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
-
-@Category(DistributedTest.class)
-public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase {
-
- public CacheXml80GatewayDUnitTest() {
- super();
- }
-
- protected String getGemFireVersion() {
- return CacheXml.VERSION_8_0;
- }
-
- @Test
- public void testGatewayReceiverWithManualStartTRUE() throws CacheException{
- //getSystem();
- CacheCreation cache = new CacheCreation();
-
- GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
- gatewayReceiverFactory.setBindAddress("");
- gatewayReceiverFactory.setStartPort(20000);
- gatewayReceiverFactory.setEndPort(29999);
- gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
- gatewayReceiverFactory.setSocketBufferSize(1500);
- gatewayReceiverFactory.setManualStart(true);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter2);
- GatewayReceiver receiver1 = gatewayReceiverFactory.create();
- try {
- receiver1.start();
- }
- catch (IOException e) {
- fail("Could not start GatewayReceiver");
- }
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
- Set<GatewayReceiver> receivers = c.getGatewayReceivers();
- for(GatewayReceiver receiver : receivers){
- validateGatewayReceiver(receiver1, receiver);
- }
- }
-
- @Test
- public void testAsyncEventQueueWithSubstitutionFilter() {
- getSystem();
- CacheCreation cache = new CacheCreation();
-
- // Create an AsyncEventQueue with GatewayEventSubstitutionFilter.
- String id = getName();
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setGatewayEventSubstitutionListener(new MyGatewayEventSubstitutionFilter());
- AsyncEventQueue queue = factory.create(id, new CacheXml70DUnitTest.MyAsyncEventListener());
-
- // Verify the GatewayEventSubstitutionFilter is set on the AsyncEventQueue.
- assertNotNull(queue.getGatewayEventSubstitutionFilter());
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
-
- // Get the AsyncEventQueue. Verify the GatewayEventSubstitutionFilter is not null.
- AsyncEventQueue queueOnCache = c.getAsyncEventQueue(id);
- assertNotNull(queueOnCache);
- assertNotNull(queueOnCache.getGatewayEventSubstitutionFilter());
- }
-
- @Test
- public void testGatewaySenderWithSubstitutionFilter() {
- getSystem();
- CacheCreation cache = new CacheCreation();
-
- // Create a GatewaySender with GatewayEventSubstitutionFilter.
- // Don't start the sender to avoid 'Locators must be configured before starting gateway-sender' exception.
- String id = getName();
- GatewaySenderFactory factory = cache.createGatewaySenderFactory();
- factory.setManualStart(true);
- factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter());
- GatewaySender sender = factory.create(id, 2);
-
- // Verify the GatewayEventSubstitutionFilter is set on the GatewaySender.
- assertNotNull(sender.getGatewayEventSubstitutionFilter());
-
- testXml(cache);
- Cache c = getCache();
- assertNotNull(c);
-
- // Get the GatewaySender. Verify the GatewayEventSubstitutionFilter is not null.
- GatewaySender senderOnCache = c.getGatewaySender(id);
- assertNotNull(senderOnCache);
- assertNotNull(senderOnCache.getGatewayEventSubstitutionFilter());
- }
-
- protected void validateGatewayReceiver(GatewayReceiver receiver1,
- GatewayReceiver gatewayReceiver){
- CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver);
- assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart());
- }
-
- public static class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
-
- public Object getSubstituteValue(EntryEvent event) {
- return event.getKey();
- }
-
- public void close() {
- }
-
- public void init(Properties properties) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
deleted file mode 100755
index d94920b..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.codeAnalysis;
-
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.AfterClass;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.codeAnalysis.decode.CompiledClass;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import com.gemstone.gemfire.util.test.TestUtil;
-
-/**
- *
- */
-@Category(IntegrationTest.class)
-public class AnalyzeWANSerializablesJUnitTest extends AnalyzeSerializablesJUnitTest {
-
- @Before
- public void loadClasses() throws Exception {
- if (classes.size() > 0) {
- return;
- }
- System.out.println("loadClasses starting");
- List<String> excludedClasses = loadExcludedClasses(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "excludedClasses.txt")));
- List<String> openBugs = loadOpenBugs(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "openBugs.txt")));
- excludedClasses.addAll(openBugs);
-
- String cp = System.getProperty("java.class.path");
- System.out.println("java classpath is " + cp);
- System.out.flush();
- String[] entries = cp.split(File.pathSeparator);
- String buildDirName =
- "geode-wan"+File.separatorChar
- +"build"+File.separatorChar
- +"classes"+File.separatorChar
- +"main";
- String buildDir = null;
-
- for (int i=0; i<entries.length && buildDir==null; i++) {
- System.out.println("examining '" + entries[i] + "'");
- System.out.flush();
- if (entries[i].endsWith(buildDirName)) {
- buildDir = entries[i];
- }
- }
- if (buildDir != null) {
- System.out.println("loading class files from " + buildDir);
- System.out.flush();
- long start = System.currentTimeMillis();
- loadClassesFromBuild(new File(buildDir), excludedClasses);
- long finish = System.currentTimeMillis();
- System.out.println("done loading " + classes.size() + " classes. elapsed time = "
- + (finish-start)/1000 + " seconds");
- }
- else {
- fail("unable to find WAN classes");
- }
- }
-
- @AfterClass
- public static void cleanup() {
- if (classes != null) {
- classes.clear();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
deleted file mode 100644
index a1aec80..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
+++ /dev/null
@@ -1,962 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.DiskStore;
-import com.gemstone.gemfire.cache.DiskStoreFactory;
-import com.gemstone.gemfire.cache.EntryNotFoundException;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
-import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
-import com.gemstone.gemfire.internal.cache.versions.VersionSource;
-import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.Invoke;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-/**
- * @since GemFire 7.0.1
- */
-@Category(DistributedTest.class)
-public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
-
- protected static final String regionName = "testRegion";
- protected static Cache cache;
- private static Set<IgnoredException>expectedExceptions = new HashSet<IgnoredException>();
-
- @Override
- public final void preTearDown() throws Exception {
- closeCache();
- Invoke.invokeInEveryVM(new SerializableRunnable() { public void run() {
- closeCache();
- } });
- }
-
- @Test
- public void testUpdateVersionAfterCreateWithSerialSender() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
-
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
-
- final String key = "key-1";
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
- vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
- vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
- Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort ));
-
- vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
- vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort));
- vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
-
- final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
-
- @Override
- public Object call() throws CacheException {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
-
- region.put(key, "value-1");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag tag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion()-1;
- int dsid = stamp.getDistributedSystemId();
- long time = System.currentTimeMillis();
-
- tag.setEntryVersion(entryVersion);
- tag.setDistributedSystemId(dsid);
- tag.setVersionTimeStamp(time);
- tag.setIsRemoteForTesting();
-
- EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
- entry.getKey(), "value-2");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals(
- "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- }
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
-
- @Override
- public Object call() throws Exception {
-
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?,?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException e) {
- // expected
- } catch (ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected "+key+" to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- Entry entry = region.getEntry(key);
- assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- }
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
- }
-
- @Test
- public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
-
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
-
- final String key = "key-1";
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
- vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "ln1"));
- vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
- vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
- Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort ));
-
- vm2.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, ""));
- vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort ));
- vm3.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, ""));
-
- final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
-
- @Override
- public Object call() throws CacheException {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof DistributedRegion);
-
- region.put(key, "value-1");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag tag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion()-1;
- int dsid = stamp.getDistributedSystemId();
- long time = System.currentTimeMillis();
-
- tag.setEntryVersion(entryVersion);
- tag.setDistributedSystemId(dsid);
- tag.setVersionTimeStamp(time);
- tag.setIsRemoteForTesting();
-
- EntryEventImpl event = createNewEvent((DistributedRegion) region, tag,
- entry.getKey(), "value-2");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals(
- "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(entryVersion+1, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- }
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
-
- @Override
- public Object call() throws Exception {
-
- Cache cache = CacheFactory.getAnyInstance();
- final Region region = cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return (region.getEntry(key) != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- }
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
- }
-
- @Test
- public void testUpdateVersionAfterCreateWithParallelSender() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
-
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
-
- final String key = "key-1";
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
- vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, true, 10, 1, false, false, null, true ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
- vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
- Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort ));
-
- vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
-
- vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort));
- vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
-
- final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
-
- @Override
- public Object call() throws CacheException {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
-
- region.put(key, "value-1");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag tag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion()-1;
- int dsid = stamp.getDistributedSystemId();
- long time = System.currentTimeMillis();
-
- tag.setEntryVersion(entryVersion);
- tag.setDistributedSystemId(dsid);
- tag.setVersionTimeStamp(time);
- tag.setIsRemoteForTesting();
-
- EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
- entry.getKey(), "value-2");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals(
- "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- }
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
-
- @Override
- public Object call() throws Exception {
-
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?,?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException e) {
- // expected
- } catch (ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- }
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
- }
-
- @Test
- public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
-
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
-
- final String key = "key-1";
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort ));
- vm0.invoke(() -> UpdateVersionDUnitTest.createConcurrentSender( "ln1", 2, false, 10, 2, false, false, null, true, 2 ));
-
- vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
- vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
- Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort ));
-
- vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
-
- vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort ));
- vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
-
- final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
-
- @Override
- public Object call() throws CacheException {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
-
- region.put(key, "value-1");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag tag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion()-1;
- int dsid = stamp.getDistributedSystemId();
- long time = System.currentTimeMillis();
-
- tag.setEntryVersion(entryVersion);
- tag.setDistributedSystemId(dsid);
- tag.setVersionTimeStamp(time);
- tag.setIsRemoteForTesting();
-
- EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
- entry.getKey(), "value-2");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals(
- "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- }
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
-
- @Override
- public Object call() throws Exception {
-
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?,?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException e) {
- // expected
- } catch (ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- }
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
- }
-
- private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key, Object value) {
- VersionTagHolder updateEvent = new VersionTagHolder(tag);
- updateEvent.setOperation(Operation.UPDATE);
- updateEvent.setRegion(region);
- if (region instanceof PartitionedRegion) {
- updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key));
- } else {
- updateEvent.setKeyInfo(new KeyInfo(key, value, null));
- }
- updateEvent.setNewValue(value);
- updateEvent.setGenerateCallbacks(true);
- updateEvent.distributedMember = region.getSystem().getDistributedMember();
- updateEvent.setNewEventId(region.getSystem());
- return updateEvent;
- }
-
- /*
- * Helper Methods
- */
-
- private static void createCache(Integer locPort) {
- UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort + "]");
- props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
- props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
- props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
- InternalDistributedSystem ds = test.getSystem(props);
- cache = CacheFactory.create(ds);
- IgnoredException ex = new IgnoredException("could not get remote locator information for remote site");
- cache.getLogger().info(ex.getAddMessage());
- expectedExceptions.add(ex);
- ex = new IgnoredException("Pool ln1 is not available");
- cache.getLogger().info(ex.getAddMessage());
- expectedExceptions.add(ex);
- }
-
- private static void closeCache() {
- if (cache != null && !cache.isClosed()) {
- for (IgnoredException expectedException: expectedExceptions) {
- cache.getLogger().info(expectedException.getRemoveMessage());
- }
- expectedExceptions.clear();
- cache.getDistributedSystem().disconnect();
- cache.close();
- }
- cache = null;
- }
-
- public static void createSender(String dsName, int remoteDsId,
- boolean isParallel, Integer maxMemory, Integer batchSize,
- boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
- boolean isManualStart) {
- File persistentDirectory = new File(dsName + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File[] dirs1 = new File[] { persistentDirectory };
- if (isParallel) {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setParallel(true);
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- ((InternalGatewaySenderFactory) gateway)
- .setLocatorDiscoveryCallback(new MyLocatorCallback());
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- if (isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
- .getName());
- } else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.setBatchConflationEnabled(isConflation);
- gateway.create(dsName, remoteDsId);
-
- } else {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- ((InternalGatewaySenderFactory) gateway)
- .setLocatorDiscoveryCallback(new MyLocatorCallback());
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- gateway.setBatchConflationEnabled(isConflation);
- if (isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
- .getName());
- } else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.create(dsName, remoteDsId);
- }
- }
-
- public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
- AttributesFactory fact = new AttributesFactory();
- if(senderIds!= null){
- StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
- while (tokenizer.hasMoreTokens()){
- String senderId = tokenizer.nextToken();
- fact.addGatewaySenderId(senderId);
- }
- }
- PartitionAttributesFactory pFact = new PartitionAttributesFactory();
- pFact.setTotalNumBuckets(totalNumBuckets);
- pFact.setRedundantCopies(redundantCopies);
- pFact.setRecoveryDelay(0);
- fact.setPartitionAttributes(pFact.create());
- Region r = cache.createRegionFactory(fact.create()).create(regionName);
- assertNotNull(r);
- }
-
- public static void createReplicatedRegion(String regionName, String senderIds){
- AttributesFactory fact = new AttributesFactory();
- if(senderIds!= null){
- StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
- while (tokenizer.hasMoreTokens()){
- String senderId = tokenizer.nextToken();
- fact.addGatewaySenderId(senderId);
- }
- }
- fact.setDataPolicy(DataPolicy.REPLICATE);
- fact.setScope(Scope.DISTRIBUTED_ACK);
- Region r = cache.createRegionFactory(fact.create()).create(regionName);
- assertNotNull(r);
- }
-
- public static void waitForSenderRunningState(String senderId){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && sender.isRunning()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender isRunning state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
- }
-
- public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
- UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
- props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
- test.getSystem(props);
- return port;
- }
-
- public static void createConcurrentSender(String dsName, int remoteDsId,
- boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
- File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File [] dirs1 = new File[] {persistentDirectory};
-
- if(isParallel) {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setParallel(true);
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- if(isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.setBatchConflationEnabled(isConflation);
- gateway.create(dsName, remoteDsId);
-
- }else {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- gateway.setBatchConflationEnabled(isConflation);
- if(isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.setDispatcherThreads(concurrencyLevel);
- gateway.create(dsName, remoteDsId);
- }
- }
-
- public static int createReceiver(int locPort) {
- UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
-
- InternalDistributedSystem ds = test.getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- } catch (IOException e) {
- e.printStackTrace();
- fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
- }
- return port;
- }
-
- public static void startSender(String senderId){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- sender = s;
- break;
- }
- }
- sender.start();
- }
-
- protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
-
- private final Set discoveredLocators = new HashSet();
-
- private final Set removedLocators = new HashSet();
-
- @Override
- public synchronized void locatorsDiscovered(List locators) {
- discoveredLocators.addAll(locators);
- notifyAll();
- }
-
- @Override
- public synchronized void locatorsRemoved(List locators) {
- removedLocators.addAll(locators);
- notifyAll();
- }
-
- public boolean waitForDiscovery(InetSocketAddress locator, long time)
- throws InterruptedException {
- return waitFor(discoveredLocators, locator, time);
- }
-
- public boolean waitForRemove(InetSocketAddress locator, long time)
- throws InterruptedException {
- return waitFor(removedLocators, locator, time);
- }
-
- private synchronized boolean waitFor(Set set, InetSocketAddress locator,
- long time) throws InterruptedException {
- long remaining = time;
- long endTime = System.currentTimeMillis() + time;
- while (!set.contains(locator) && remaining >= 0) {
- wait(remaining);
- remaining = endTime - System.currentTimeMillis();
- }
- return set.contains(locator);
- }
-
- public synchronized Set getDiscovered() {
- return new HashSet(discoveredLocators);
- }
-
- public synchronized Set getRemoved() {
- return new HashSet(removedLocators);
- }
- }
-
- private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- return s;
- }
- }
- //if none of the senders matches with the supplied senderId, return null
- return null;
- }
-
- public static Integer createFirstLocatorWithDSId(int dsId) {
- UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
- props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
- props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
- return port;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
deleted file mode 100755
index 96d441c..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.DiskStore;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.client.Pool;
-import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class CacheClientNotifierDUnitTest extends WANTestBase {
-
- private static final int NUM_KEYS = 10;
-
- private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity,
- final String policy, final String diskStoreName) {
- final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-
- SerializableRunnable createCacheServer = new SerializableRunnable() {
- @Override
- public void run() throws Exception {
- CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
- server.setPort(serverPort);
- if (withCSC) {
- if (diskStoreName != null) {
- DiskStore ds = cache.findDiskStore(diskStoreName);
- if(ds == null) {
- ds = cache.createDiskStoreFactory().create(diskStoreName);
- }
- }
- ClientSubscriptionConfig csc = server.getClientSubscriptionConfig();
- csc.setCapacity(capacity);
- csc.setEvictionPolicy(policy);
- csc.setDiskStoreName(diskStoreName);
- server.setHostnameForClients("localhost");
- //server.setGroups(new String[]{"serv"});
- }
- try {
- server.start();
- } catch (IOException e) {
- com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
- }
- }
- };
- vm.invoke(createCacheServer);
- return serverPort;
- }
-
- private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final int capacity) {
- SerializableRunnable checkCacheServer = new SerializableRunnable() {
-
- @Override
- public void run() throws Exception {
- List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver();
- CacheServerImpl server = null;
- for (CacheServer cs:cacheServers) {
- if (cs.getPort() == serverPort) {
- server = (CacheServerImpl)cs;
- break;
- }
- }
- assertNotNull(server);
- CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier();
- HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer();
- if (server.getAcceptor().isGatewayReceiver()) {
- assertNull(haContainer);
- return;
- }
- Region internalRegion = haContainer.getMapForTest();
- RegionAttributes ra = internalRegion.getAttributes();
- EvictionAttributes ea = ra.getEvictionAttributes();
- if (withCSC) {
- assertNotNull(ea);
- assertEquals(capacity, ea.getMaximum());
- assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction());
- } else {
- assertNull(ea);
- }
- }
- };
- vm.invoke(checkCacheServer);
- }
-
- public static void closeACacheServer(final int serverPort) {
- List<CacheServer> cacheServers = cache.getCacheServers();
- CacheServerImpl server = null;
- for (CacheServer cs:cacheServers) {
- if (cs.getPort() == serverPort) {
- server = (CacheServerImpl)cs;
- break;
- }
- }
- assertNotNull(server);
- server.stop();
- }
-
- private void verifyRegionSize(VM vm, final int expect) {
- SerializableRunnable verifyRegionSize = new SerializableRunnable() {
- @Override
- public void run() throws Exception {
- final Region region = cache.getRegion(getTestMethodName() + "_PR");
-
- Wait.waitForCriterion(new WaitCriterion() {
- public boolean done() {
- return region.size() == expect;
- }
- public String description() {
- return null;
- }
- }, 60000, 100, false);
- assertEquals(expect, region.size());
- }
- };
- vm.invoke(verifyRegionSize);
- }
-
- /**
- * The test will start several cache servers, including gateway receivers.
- * Shutdown them and verify the CacheClientNotifier for each server is correct
- */
- @Test
- public void testNormalClient2MultipleCacheServer() throws Exception {
- doMultipleCacheServer(false);
- }
-
- public void doMultipleCacheServer(boolean durable) throws Exception {
- /* test scenario: */
- /* create 1 GatewaySender on vm0 */
- /* create 1 GatewayReceiver on vm1 */
- /* create 2 cache servers on vm1, one with overflow. */
- /* verify if the cache server2 still has the overflow attributes */
- /* create 1 cache client1 on vm2 to register interest on cache server1 */
- /* create 1 cache client2 on vm3 to register interest on cache server1 */
- /* do some puts to GatewaySender on vm0 */
-
- // create sender at ln
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
-
- // create receiver and cache servers will be at ny
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm1.invoke(() -> WANTestBase.createCache( nyPort ));
- int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver());
- checkCacheServer(vm1, receiverPort, false, 0);
-
- // create PR for receiver
- vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- // create cache server1 with overflow
- int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT");
- checkCacheServer(vm1, serverPort, true, 3);
-
- // create cache server 2
- final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null);
- // Currently, only the first cache server's overflow attributes will take effect
- // It will be enhanced in GEODE-1102
- checkCacheServer(vm1, serverPort2, true, 3);
- LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2);
-
- vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "123", durable));
- vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "124", durable));
-
- vm0.invoke(() -> WANTestBase.createCache( lnPort ));
- vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true ));
- vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm0.invoke(() -> WANTestBase.startSender( "ln" ));
- vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
-
- /* verify */
- verifyRegionSize(vm0, NUM_KEYS);
- verifyRegionSize(vm1, NUM_KEYS);
- verifyRegionSize(vm3, NUM_KEYS);
- verifyRegionSize(vm2, NUM_KEYS);
-
- // close a cache server, then re-test
- vm1.invoke(() -> closeACacheServer(serverPort2));
-
- vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 ));
-
- /* verify */
- verifyRegionSize(vm0, NUM_KEYS*2);
- verifyRegionSize(vm1, NUM_KEYS*2);
- verifyRegionSize(vm3, NUM_KEYS*2);
- verifyRegionSize(vm2, NUM_KEYS*2);
-
- disconnectAllFromDS();
- }
-
- public static void createClientWithLocator(int port0,String host,
- String regionName, String clientId, boolean isDurable) {
- WANTestBase test = new WANTestBase();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "");
- if (isDurable) {
- props.setProperty(DURABLE_CLIENT_ID, clientId);
- props.setProperty(DURABLE_CLIENT_TIMEOUT, "" + 200);
- }
-
- InternalDistributedSystem ds = test.getSystem(props);
- cache = CacheFactory.create(ds);
-
- assertNotNull(cache);
- CacheServerTestUtil.disableShufflingOfEndpoints();
- Pool p;
- try {
- p = PoolManager.createFactory().addLocator(host, port0)
- .setPingInterval(250).setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1).setReadTimeout(2000)
- .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
- .setRetryAttempts(3).create(regionName);
- } finally {
- CacheServerTestUtil.enableShufflingOfEndpoints();
- }
-
- AttributesFactory factory = new AttributesFactory();
- factory.setPoolName(p.getName());
- factory.setDataPolicy(DataPolicy.NORMAL);
- RegionAttributes attrs = factory.create();
- region = cache.createRegion(regionName, attrs);
- region.registerInterest("ALL_KEYS");
- assertNotNull(region);
- if (isDurable) {
- cache.readyForEvents();
- }
- LogWriterUtils.getLogWriter().info(
- "Distributed Region " + regionName + " created Successfully :"
- + region.toString() + " in a "+(isDurable?"durable":"")+" client");
- }
-}