You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2021/11/12 06:57:38 UTC
[geode] branch develop updated: GEODE-9369: Command to copy region
entries from a WAN site to another (#7006)
This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3eaeed8 GEODE-9369: Command to copy region entries from a WAN site to another (#7006)
3eaeed8 is described below
commit 3eaeed886f3e6ec9ec6f341954fc837eed7b03f8
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Fri Nov 12 07:56:10 2021 +0100
GEODE-9369: Command to copy region entries from a WAN site to another (#7006)
* GEODE-9369: Add command to copy region entries from one site to another
* GEODE-9369: Create servers without offheap to avoid test failures in windows due to out of memory
* GEODE-9369: Fix flaky test
* GEODE-9369: Fix some test case failues on windows
* GEODE-9369: Fix windows test failures in CI by not running the tests on windows
* GEODE-9369: Fix test cases after rebasing locator serialization filtering by default on Java 8
* GEODE-9369: Revert changes required by GEODE-9758
---
.../source/subnavs/geode-subnav.erb | 1 +
.../cache30/DistributedAckRegionCCEDUnitTest.java | 2 +-
.../geode/internal/cache/EnumListenerEvent.java | 24 +-
.../apache/geode/internal/cache/LocalRegion.java | 7 +-
.../cache/tier/sockets/CacheServerStats.java | 4 +-
.../sockets/command/GatewayReceiverCommand.java | 11 +-
.../internal/cache/tier/sockets/command/Put70.java | 2 +-
.../wan/GatewaySenderEventCallbackDispatcher.java | 9 +
.../cache/wan/GatewaySenderEventDispatcher.java | 9 +
.../internal/cache/wan/GatewaySenderEventImpl.java | 13 +-
.../internal/cache/EnumListenerEventJUnitTest.java | 3 +-
.../cache/tier/sockets/command/Put70Test.java | 2 +-
.../cache/wan/GatewaySenderEventImplTest.java | 36 +-
.../gfsh/command-pages/wan_copy_region.html.md.erb | 192 +++
.../gfsh/gfsh_command_index.html.md.erb | 5 +-
.../gfsh/quick_ref_commands_by_area.html.md.erb | 1 +
.../GfshParserAutoCompletionIntegrationTest.java | 6 +-
geode-wan/build.gradle | 1 +
.../commands/WanCopyRegionCommandDUnitTest.java | 1399 ++++++++++++++++++++
.../geode/internal/cache/wan/WANTestBase.java | 278 ++--
...arallelWANPropagationClientServerDUnitTest.java | 4 +-
.../WanCommandAutoCompletionIntegrationTest.java | 50 +
.../GatewaySenderEventRemoteDispatcher.java | 20 +
.../wan/internal/WanCopyRegionFunctionService.java | 123 ++
...gionFunctionServiceAlreadyRunningException.java | 24 +-
.../cli/commands/WanCopyRegionCommand.java | 138 ++
.../client/locator/GatewaySenderBatchOp.java | 6 +-
.../cli/functions/WanCopyRegionFunction.java | 209 +++
.../functions/WanCopyRegionFunctionDelegate.java | 386 ++++++
.../org.apache.geode.internal.cache.CacheService | 1 +
.../org.springframework.shell.core.CommandMarker | 1 +
.../sanctioned-geode-wan-serializables.txt | 6 +
.../internal/WanCopyRegionFunctionServiceTest.java | 186 +++
.../cli/commands/WanCopyRegionCommandTest.java | 49 +
.../WanCopyRegionFunctionDelegateTest.java | 575 ++++++++
.../cli/functions/WanCopyRegionFunctionTest.java | 142 ++
geode-wan/src/test/resources/expected-pom.xml | 5 +
37 files changed, 3739 insertions(+), 191 deletions(-)
diff --git a/geode-book/master_middleman/source/subnavs/geode-subnav.erb b/geode-book/master_middleman/source/subnavs/geode-subnav.erb
index 2b27ac8..14b54c8 100644
--- a/geode-book/master_middleman/source/subnavs/geode-subnav.erb
+++ b/geode-book/master_middleman/source/subnavs/geode-subnav.erb
@@ -2084,6 +2084,7 @@ limitations under the License.
</li>
<li>
<a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/version.html">version</a>
+ <a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/wan_copy_region.html">wan-copy region</a>
</li>
</ul>
</li>
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index a0be478..8e276bf 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -420,7 +420,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
VersionTagHolder holder = new VersionTagHolder(tag);
ClientProxyMembershipID id = ClientProxyMembershipID
.getNewProxyMembership(CCRegion.getDistributionManager().getSystem());
- CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder);
+ CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, holder, true);
vm0.invoke("check conflation count", () -> {
// after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false
// ARM.updateEntry will be called one more time, so there will be 2 conflicted events
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
index 8f95d97..c2bd339 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
@@ -106,12 +106,17 @@ public abstract class EnumListenerEvent {
public static final EnumListenerEvent TIMESTAMP_UPDATE = new TIMESTAMP_UPDATE(); // 18
@Immutable
+ public static final EnumListenerEvent AFTER_UPDATE_WITH_GENERATE_CALLBACKS =
+ new AFTER_UPDATE_WITH_GENERATE_CALLBACKS(); // 19
+
+ @Immutable
private static final EnumListenerEvent[] instances =
new EnumListenerEvent[] {AFTER_CREATE, AFTER_UPDATE, AFTER_INVALIDATE, AFTER_DESTROY,
AFTER_REGION_CREATE, AFTER_REGION_INVALIDATE, AFTER_REGION_CLEAR, AFTER_REGION_DESTROY,
AFTER_REMOTE_REGION_CREATE, AFTER_REMOTE_REGION_DEPARTURE, AFTER_REMOTE_REGION_CRASH,
AFTER_ROLE_GAIN, AFTER_ROLE_LOSS, AFTER_REGION_LIVE, AFTER_REGISTER_INSTANTIATOR,
- AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE};
+ AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE,
+ AFTER_UPDATE_WITH_GENERATE_CALLBACKS};
static {
for (int i = 0; i < instances.length; i++) {
@@ -412,6 +417,21 @@ public abstract class EnumListenerEvent {
}
}
+ private static class AFTER_UPDATE_WITH_GENERATE_CALLBACKS extends EnumListenerEvent {
+ protected AFTER_UPDATE_WITH_GENERATE_CALLBACKS() {
+ super("AFTER_UPDATE_WITH_GENERATE_CALLBACKS");
+ }
+
+ @Override
+ public void dispatchEvent(CacheEvent event, CacheListener listener) {}
+
+ @Override
+ public byte getEventCode() {
+ return 19;
+ }
+ }
+
+
/**
*
* This method returns the EnumListenerEvent object corresponding to the cCode given.
@@ -435,6 +455,8 @@ public abstract class EnumListenerEvent {
* <li>15 - AFTER_REGISTER_INSTANTIATOR
* <li>16 - AFTER_REGISTER_DATASERIALIZER
* <li>17 - AFTER_TOMBSTONE_EXPIRATION
+ * <li>18 - TIMESTAMP_UPDATE
+ * <li>19 - AFTER_UPDATE_WITH_GENERATE_CALLBACKS
* </ul>
*
* @param eventCode the eventCode corresponding to the EnumListenerEvent object desired
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index af28668..88becfd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5179,8 +5179,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boolean isObject,
- Object callbackArg, ClientProxyMembershipID memberId, boolean fromClient,
- EntryEventImpl clientEvent) throws TimeoutException, CacheWriterException {
+ Object callbackArg, ClientProxyMembershipID memberId,
+ EntryEventImpl clientEvent, boolean generateCallbacks)
+ throws TimeoutException, CacheWriterException {
EventID eventID = clientEvent.getEventId();
Object theCallbackArg = callbackArg;
@@ -5189,7 +5190,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
@Released
final EntryEventImpl event = entryEventFactory.create(this, Operation.UPDATE, key,
null, theCallbackArg, false,
- memberId.getDistributedMember(), true, eventID);
+ memberId.getDistributedMember(), generateCallbacks, eventID);
try {
event.setContext(memberId);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
index 23f0e8e..f63df94 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
@@ -992,8 +992,8 @@ public class CacheServerStats implements MessageStats {
return this.stats.getDouble(loadPerConnectionId);
}
- public int getProcessBatchRequests() {
- return this.stats.getInt(processBatchRequestsId);
+ public long getProcessBatchRequests() {
+ return this.stats.getLong(processBatchRequestsId);
}
public void close() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 1a15cd3..848c3cb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.util.BlobHelper;
@@ -308,7 +309,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// attempt to update the entry
if (!result) {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
- serverConnection.getProxyID(), false, clientEvent);
+ serverConnection.getProxyID(), clientEvent, true);
}
}
@@ -333,6 +334,7 @@ public class GatewayReceiverCommand extends BaseCommand {
break;
case 1: // Update
+ case GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS:
try {
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
@@ -405,8 +407,10 @@ public class GatewayReceiverCommand extends BaseCommand {
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
+ boolean generateCallbacks =
+ actionType != GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS;
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
- serverConnection.getProxyID(), false, clientEvent);
+ serverConnection.getProxyID(), clientEvent, generateCallbacks);
}
if (result || clientEvent.isConcurrencyConflict()) {
serverConnection.setModificationInfo(true, regionName, key);
@@ -637,7 +641,8 @@ public class GatewayReceiverCommand extends BaseCommand {
exceptions.add(be);
} finally {
// Increment the partNumber
- if (actionType == 0 /* create */ || actionType == 1 /* update */) {
+ if (actionType == 0 /* create */ || actionType == 1 /* update */
+ || actionType == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
if (callbackArgExists) {
partNumber += 9;
} else {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
index 8ad1de7..d11fbf2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
@@ -379,7 +379,7 @@ public class Put70 extends BaseCommand {
serverConnection.getProxyID(), true, clientEvent, true);
} else {
result = region.basicBridgePut(key, value, delta, isObject, callbackArg,
- serverConnection.getProxyID(), true, clientEvent);
+ serverConnection.getProxyID(), clientEvent, true);
}
if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index e5683e0..58531bd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -23,6 +23,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
@@ -184,4 +187,10 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
public void shutDownAckReaderConnection() {
// no op
}
+
+ @Override
+ public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+ ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
index 86828ae..42fbc02 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -16,6 +16,10 @@ package org.apache.geode.internal.cache.wan;
import java.util.List;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+
/**
* @since GemFire 7.0
*
@@ -31,4 +35,9 @@ public interface GatewaySenderEventDispatcher {
void stop();
void shutDownAckReaderConnection();
+
+ void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+ ExecutablePool senderPool,
+ int batchId, boolean removeFromQueueOnException)
+ throws BatchException70;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 7c3537e..9b80278 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -200,6 +200,8 @@ public class GatewaySenderEventImpl
private static final int VERSION_ACTION = 3;
+ public static final int UPDATE_ACTION_NO_GENERATE_CALLBACKS = 4;
+
private static final int INVALIDATE_ACTION = 5;
/**
* Static constants for Operation detail of EntryEvent.
@@ -309,7 +311,7 @@ public class GatewaySenderEventImpl
// Initialize the action and number of parts (called after _callbackArgument
// is set above)
- initializeAction(this.operation);
+ initializeAction(this.operation, event);
// initialize the operation detail
initializeOperationDetail(event.getOperation());
@@ -1022,7 +1024,7 @@ public class GatewaySenderEventImpl
*
* @param operation The operation from which to initialize this event's action and number of parts
*/
- protected void initializeAction(EnumListenerEvent operation) {
+ protected void initializeAction(EnumListenerEvent operation, EntryEventImpl event) {
if (operation == EnumListenerEvent.AFTER_CREATE) {
// Initialize after create action
action = CREATE_ACTION;
@@ -1065,6 +1067,13 @@ public class GatewaySenderEventImpl
// Initialize number of parts
// Since there is no value, there is one less part
numberOfParts = (callbackArgument == null) ? 7 : 8;
+ } else if (operation == EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS) {
+ if (event.isGenerateCallbacks()) {
+ action = UPDATE_ACTION;
+ } else {
+ action = UPDATE_ACTION_NO_GENERATE_CALLBACKS;
+ }
+ numberOfParts = (this.callbackArgument == null) ? 8 : 9;
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
index edb6f72..3e6a7f6 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
@@ -47,7 +47,8 @@ public class EnumListenerEventJUnitTest {
// extra non-existent code checks as a markers so that this test will
// fail if further events are added (0th or +1 codes) without updating this test
checkAndAssert(18, EnumListenerEvent.TIMESTAMP_UPDATE);
- checkAndAssert(19, null);
+ checkAndAssert(19, EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS);
+ checkAndAssert(20, null);
}
// check that the code and object both match
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
index 7ee3e26..2938a9c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
@@ -138,7 +138,7 @@ public class Put70Test {
when(keyPart.getStringOrObject()).thenReturn(KEY);
when(localRegion.basicBridgePut(eq(KEY), eq(VALUE), eq(null), eq(true), eq(CALLBACK_ARG),
- any(), eq(true), any())).thenReturn(true);
+ any(), any(), eq(true))).thenReturn(true);
when(message.getNumberOfParts()).thenReturn(8);
when(message.getPart(eq(0))).thenReturn(regionNamePart);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
index 1755991..6e05864 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
@@ -113,13 +114,12 @@ public class GatewaySenderEventImplTest {
@Parameters(method = "getVersionsAndExpectedInvocations")
public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations vaei)
throws IOException {
- InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
OutputStream outputStream = mock(OutputStream.class);
VersionedDataOutputStream versionedDataOutputStream =
new VersionedDataOutputStream(outputStream, vaei.getVersion());
- internalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
+ InternalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
verify(gatewaySenderEvent, times(0)).toData(any(), any());
verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).toDataPre_GEODE_1_15_0_0(any(),
any());
@@ -134,7 +134,6 @@ public class GatewaySenderEventImplTest {
public void testDeserializingDataFromOldVersionToCurrentVersion(
VersionAndExpectedInvocations vaei)
throws IOException, ClassNotFoundException {
- InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
InputStream inputStream = mock(InputStream.class);
when(inputStream.read()).thenReturn(69); // NULL_STRING
@@ -142,7 +141,7 @@ public class GatewaySenderEventImplTest {
VersionedDataInputStream versionedDataInputStream =
new VersionedDataInputStream(inputStream, vaei.getVersion());
- internalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
+ InternalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
verify(gatewaySenderEvent, times(0)).fromData(any(), any());
verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).fromDataPre_GEODE_1_15_0_0(any(),
any());
@@ -349,6 +348,35 @@ public class GatewaySenderEventImplTest {
return cacheEvent;
}
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks,
+ boolean isCallbackArgumentNull)
+ throws IOException {
+ InternalRegion region = mock(InternalRegion.class);
+ when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");
+
+ Operation operation = mock(Operation.class);
+ when(operation.isLocalLoad()).thenReturn(true);
+
+ EntryEventImpl cacheEvent = mock(EntryEventImpl.class);
+ when(cacheEvent.getRegion()).thenReturn(region);
+ when(cacheEvent.getEventId()).thenReturn(mock(EventID.class));
+ when(cacheEvent.getOperation()).thenReturn(operation);
+ when(cacheEvent.isGenerateCallbacks()).thenReturn(isGenerateCallbacks);
+ when(cacheEvent.getRawCallbackArgument())
+ .thenReturn(isCallbackArgumentNull ? null : mock(GatewaySenderEventCallbackArgument.class));
+
+ GatewaySenderEventImpl event = new GatewaySenderEventImpl(
+ EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS, cacheEvent,
+ null, false, INCLUDE_LAST_EVENT);
+
+ final int numberOfParts = isCallbackArgumentNull ? 8 : 9;
+ assertThat(event.getNumberOfParts()).isEqualTo(numberOfParts);
+
+ final int action = isGenerateCallbacks ? 1 : 4;
+ assertThat(event.getAction()).isEqualTo(action);
+ }
+
public static class VersionAndExpectedInvocations {
private final KnownVersion version;
diff --git a/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb
new file mode 100644
index 0000000..38589c6
--- /dev/null
+++ b/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb
@@ -0,0 +1,192 @@
+---
+title: wan-copy region
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+Copy the entries of a region in a WAN site onto the same region in another WAN site, using a gateway sender.
+
+This command copies region entries from a WAN site to another by putting them in batches of configurable
+size that are sent to the remote site by the selected gateway sender. Batch size is specified as number of entries per batch.
+
+The command allows you to specify a maximum copy rate in order not to stress excessively the sending or receiving WAN sites.
+This rate is configured in entries per second.
+
+Callbacks (cache listeners, replication to other WAN sites) will not be executed in the remote WAN site for the entries copied.
+
+The main uses of this command are:
+
+- Recovery of a WAN site after a disaster in which the failed site needs to be put into service again with
+ the data from another WAN site.
+
+- Adding a new WAN site to a <%=vars.product_name%> system in which the data in the new WAN site needs to be initially loaded from
+ an existing WAN site.
+
+The execution of a currently running instance of this command may be stopped by using
+this same command with the `--cancel` option.
+
+**Requirements:**
+
+The `wan-copy region` command requires that
+
+- a gateway sender is configured and running on the source WAN site
+- a gateway receiver is configured and running on the remote WAN site
+- the region onto which the data will be copied has already been created on the remote WAN site
+
+**Availability:** Online. You must be connected in `gfsh` to a JMX Manager member to use this command.
+
+**Syntax:**
+
+``` pre
+wan-copy region --region=value --sender-id=value [--max-rate=value] [--batch-size=value]
+ [--cancel]
+```
+
+<a id="wan_copy_region_command_params"></a>
+<style>
+table th:first-of-type {
+ width: 20%;
+}
+table th:nth-of-type(2) {
+ width: 60%;
+}
+table th:nth-of-type(3) {
+ width: 20%;
+}
+</style>
+
+| Name | Description | Default Value |
+|------|-------------|---------------|
+| ‑‑region| <em>Required</em>. The region for which the data is to be copied. | |
+| ‑‑sender-id| <em>Required</em>. The gateway sender to be used to copy the region entries. | |
+| ‑‑max-rate| The maximum copy rate in entries per second. If the sender is parallel, the maximum rate limit is applied to each server hosting buckets for the region to be copied. | 0 (unlimited) |
+| ‑‑batch-size| The size of the batches, in number of entries, to be used to copy the region entries. | 1000 |
+| ‑‑cancel| Cancel a running `wan-copy region` command for the specified sender and region. If the `sender-id` and `region` passed are both "*", then all running `wan-copy region` commands will be canceled. | |
+
+<span class="tablecap">Table 1. Copy Region Parameters</span>
+
+
+**Example Commands:**
+
+``` pre
+wan-copy region --region=myRegion--sender-id=mySender --max-rate=1000 --batch-size=100
+```
+
+``` pre
+wan-copy region --region=/overload --sender-id=sender1 --cancel
+```
+
+``` pre
+wan-copy region --region=* --sender-id=* --cancel
+```
+
+**Sample Output:**
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=myParallelSender --max-rate=100 --batch-size=100
+ Member | Status | Message
+ -------------- | ------ | -----------------------
+ server-sender | OK | Entries copied: 333
+ server-sender3 | OK | Entries copied: 334
+ server-sender2 | OK | Entries copied: 333
+
+```
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=mySerialSender --max-rate=100 --batch-size=100
+ Member | Status | Message
+ -------------- | ------ | ----------------------------------------------------------------------
+ server-sender2 | OK | Sender mySerialSender is serial and not primary. 0 entries copied.
+ server-sender3 | OK | Sender mySerialSender is serial and not primary. 0 entries copied.
+ server-sender | OK | Entries copied: 1000
+```
+
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=sender1 --cancel
+ Member | Status | Message
+ -------------- | ------ | ------------------
+ server-sender2 | OK | Execution canceled
+ server-sender | OK | Execution canceled
+ server-sender3 | OK | Execution canceled
+```
+
+``` pre
+gfsh>wan-copy region --region=myRegion --sender-id=myParallelSender --max-rate=100 --batch-size=10
+ Member | Status | Message
+ -------------- | ------ | ------------------------------------------------------
+ server-sender2 | OK | Operation canceled after having copied 10 entries
+ server-sender | OK | Operation canceled after having copied 10 entries
+ server-sender3 | OK | Operation canceled after having copied 10 entries
+```
+
+``` pre
+gfsh>wan-copy region --region=myRegion --sender-id=myParallelSender --max-rate=100 --batch-size=10
+ Member | Status | Message
+ -------------- | ------ | ----------------------------------------------------------------------
+ server-sender2 | OK | Sender mySerialSender is serial and not primary. 0 entries copied.
+ server-sender3 | OK | Sender mySerialSender is serial and not primary. 0 entries copied.
+ server-sender | OK | Operation canceled after having copied 4 entries
+```
+
+**Error Messages:**
+
+Example of `wan-copy region` with an invalid region:
+
+``` pre
+gfsh> wan-copy region --region=/regionX --sender-id=sender1
+ Member | Status | Message
+ -------------- | ------ | -------------------------
+ server-sender | ERROR | Region /regionX not found
+ server-sender2 | ERROR | Region /regionX not found
+ server-sender3 | ERROR | Region /regionX not found
+```
+
+Example of `wan-copy region` with a stopped gateway sender:
+
+``` pre
+gfsh> wan-copy region --region=/region1 --sender-id=sender1
+ Member | Status | Message
+ -------------- | ------ | -----------------------------
+ server-sender | ERROR | Sender sender1 is not running
+ server-sender2 | ERROR | Sender sender1 is not running
+ server-sender3 | ERROR | Sender sender1 is not running
+```
+
+Example of cancel of `wan-copy region` when no command is running:
+
+``` pre
+gfsh> wan-copy region --region=/region1 --sender-id=sender1 --cancel
+ Member | Status | Message
+ -------------- | ------ | ------------------------------------------------------------------------------------
+ server-sender2 | ERROR | No running command to be canceled for region /region1 and sender sender1
+ server-sender | ERROR | No running command to be canceled for region /region1 and sender sender1
+ server-sender3 | ERROR | No running command to be canceled for region /region1 and sender sender1
+```
+
+Example of cancel of all running `wan-copy region` commands:
+
+``` pre
+gfsh> wan-copy region --region=* --sender-id=* --cancel
+ Member | Status | Message
+ -------------- | ------ | ------------------------------------------------------------------------------------
+ server-sender2 | OK | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+ server-sender | OK | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+ server-sender3 | OK | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+```
+
diff --git a/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb b/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
index f503849..120dbbb 100644
--- a/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
@@ -161,7 +161,7 @@ This section provides help and usage information on all `gfsh` commands, listed
Remove an entry from a region.
-- **[restore redunadancy](../../tools_modules/gfsh/command-pages/restore.html)**
+- **[restore redundancy](../../tools_modules/gfsh/command-pages/restore.html)**
Restore redundancy to partitioned regions and optionally reassign which members host the primary copies.
@@ -221,4 +221,7 @@ This section provides help and usage information on all `gfsh` commands, listed
Display product version information.
+- **[wan-copy region](../../tools_modules/gfsh/command-pages/wan_copy_region.html)**
+
+ Copy the data of a region from a WAN site to the same region on another WAN site by using a gateway sender.
diff --git a/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb b/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
index 46c99cf..a80812b 100644
--- a/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
@@ -112,6 +112,7 @@ limitations under the License.
| [put](command-pages/put.html) | Add or update a region entry. | online |
| [query](command-pages/query.html) | Run queries against <%=vars.product_name%> regions. | online |
| [remove](command-pages/remove.html) | Remove an entry from a region. | online |
+| [wan-copy region ](command-pages/wan_copy_region.html) | Copy the entries of a region in a WAN site onto the same region in another WAN site, using a gateway sender. | online |
## <a id="topic_1B47A0E110124EB6BF08A467EB510412" class="no-quick-link"></a>Deployment Commands
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
index b23f763..9988356 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
@@ -413,7 +413,7 @@ public class GfshParserAutoCompletionIntegrationTest {
String hintArgument = "data";
String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
- assertThat(hintsProvidedArray.length).isEqualTo(17);
+ assertThat(hintsProvidedArray).hasSize(17);
assertThat(hintsProvidedArray[0])
.isEqualTo("User data as stored in regions of the Geode distributed system.");
}
@@ -423,7 +423,7 @@ public class GfshParserAutoCompletionIntegrationTest {
String hintArgument = "";
String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
- assertThat(hintsProvidedArray.length).isEqualTo(21);
+ assertThat(hintsProvidedArray).hasSize(21);
assertThat(hintsProvidedArray[0]).isEqualTo(
"Hints are available for the following topics. Use \"hint <topic-name>\" for a specific hint.");
}
@@ -433,7 +433,7 @@ public class GfshParserAutoCompletionIntegrationTest {
String hintArgument = "fortytwo";
String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
- assertThat(hintsProvidedArray.length).isEqualTo(1);
+ assertThat(hintsProvidedArray).hasSize(1);
assertThat(hintsProvidedArray[0]).isEqualTo(
"Unknown topic: " + hintArgument + ". Use hint to view the list of available topics.");
}
diff --git a/geode-wan/build.gradle b/geode-wan/build.gradle
index 2ff4cc0..3d00a2a 100644
--- a/geode-wan/build.gradle
+++ b/geode-wan/build.gradle
@@ -30,6 +30,7 @@ dependencies {
implementation(project(':geode-serialization'))
implementation(project(':geode-tcp-server'))
implementation(project(':geode-core'))
+ implementation(project(':geode-gfsh'))
implementation('org.apache.commons:commons-lang3')
compileOnly('org.apache.logging.log4j:log4j-api')
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java
new file mode 100644
index 0000000..df628fc
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java
@@ -0,0 +1,1399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__BATCHSIZE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__CANCEL;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MAXRATE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__COPIED__ENTRIES;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__REGION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__SENDERID;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.ImmutableList;
+import junitparams.Parameters;
+import org.assertj.core.api.Condition;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.junit.assertions.CommandResultAssert;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.LocatorLauncherStartupRule;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+@Category({WanTest.class})
+public class WanCopyRegionCommandDUnitTest extends WANTestBase {
+
+ protected static VM vm8;
+
+ private static final long serialVersionUID = 1L;
+
+ private enum Gateway {
+ SENDER, RECEIVER
+ }
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule
+ public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+
+ @Rule
+ public DistributedExecutorServiceRule executorServiceRule = new DistributedExecutorServiceRule();
+
+ @BeforeClass
+ public static void beforeClassWanCopyRegionCommandDUnitTest() {
+ vm8 = VM.getVM(8);
+ }
+
+ @Test
+ public void testUnsuccessfulExecution_RegionNotFound() throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(true, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ int wanCopyRegionBatchSize = 20;
+ String regionName = "foo";
+
+ // Execute wan-copy region command
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .getCommandString();
+
+ // Check command status and output
+ CommandResultAssert command =
+ verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+ String message =
+ CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
+ regionName);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactly(message, message, message);
+ }
+
+ @Test
+ public void testUnsuccessfulExecution_SenderNotFound() throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(true, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ int wanCopyRegionBatchSize = 20;
+ String regionName = getRegionName(true);
+
+ // Execute wan-copy region command
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .getCommandString();
+
+ // Check command status and output
+ CommandResultAssert command =
+ verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+ String message =
+ CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, senderIdInA);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactly(message, message, message);
+ }
+
+ @Test
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testUnsuccessfulExecution_ExceptionAtReceiver(
+ boolean isPartitionedRegion, boolean isParallelGatewaySender) throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ String regionName = getRegionName(isPartitionedRegion);
+ int wanCopyRegionBatchSize = 20;
+
+ int entries = 20;
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+ // Check that entries are put in the region
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // destroy region to provoke the exception
+ serverInB.invoke(() -> destroyRegion(regionName));
+
+ // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+ createSenders(isParallelGatewaySender, serversInA, serverInB,
+ senderIdInA, senderIdInB);
+ createReceivers(serverInB, serverInC);
+
+ // Execute wan-copy region command
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .getCommandString();
+
+ // Check command status and output
+ if (isParallelGatewaySender) {
+ CommandResultAssert command =
+ verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+ Condition<String> exceptionError =
+ new Condition<>(s -> s.startsWith("Error ("), "Error");
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(3, exceptionError);
+ } else {
+ CommandResultAssert command =
+ verifyStatusIsErrorInOneServer(gfsh.executeAndAssertThat(commandString));
+ Condition<String> exceptionError =
+ new Condition<>(s -> s.startsWith("Error ("), "Error");
+ Condition<String> senderNotPrimary = new Condition<>(
+ s -> s.equals(CliStrings
+ .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderIdInA)),
+ "sender not primary");
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, exceptionError)
+ .haveExactly(2, senderNotPrimary);
+ }
+ }
+
+ private Object[] parametersToTestSenderOrReceiverGoesDownDuringExecution() {
+ return new Object[] {
+ new Object[] {true, true, Gateway.SENDER, false},
+ new Object[] {false, true, Gateway.SENDER, true},
+ new Object[] {false, true, Gateway.SENDER, false},
+ new Object[] {false, false, Gateway.SENDER, true},
+ new Object[] {false, false, Gateway.SENDER, false},
+ new Object[] {true, true, Gateway.RECEIVER, false},
+ new Object[] {false, true, Gateway.RECEIVER, true},
+ new Object[] {false, true, Gateway.RECEIVER, false},
+ new Object[] {false, false, Gateway.RECEIVER, true},
+ new Object[] {false, false, Gateway.RECEIVER, false}
+ };
+ }
+
+ /**
+ * This test creates two sites A & B, each one containing 3 servers.
+ * A region is created in both sites, and populated in site A.
+ * After that replication is configured from site A to site B.
+ * WanCopyRegionFunction is called, and while it is running, a sender in site A
+ * or a receiver in site B are killed.
+ */
+ @Test
+ @Parameters(method = "parametersToTestSenderOrReceiverGoesDownDuringExecution")
+ public void testSenderOrReceiverGoesDownDuringExecution(boolean useParallel,
+ boolean usePartitionedRegion, Gateway gwToBeStopped, boolean stopPrimarySender)
+ throws Exception {
+
+ if (gwToBeStopped == Gateway.SENDER && (useParallel || stopPrimarySender)) {
+ addIgnoredExceptionsForSenderInUseWentDown();
+ }
+ if (gwToBeStopped == Gateway.RECEIVER &&
+ (usePartitionedRegion || !useParallel)) {
+ addIgnoredExceptionsForReceiverConnectedToSenderInUseWentDown();
+ }
+ final int wanCopyRegionBatchSize = 10;
+ final int entries;
+ if (!useParallel && !usePartitionedRegion && stopPrimarySender) {
+ entries = 2500;
+ } else {
+ entries = 1000;
+ }
+
+ final String regionName = getRegionName(usePartitionedRegion);
+
+ // Site A
+ VM locatorInA = vm0;
+ VM server1InA = vm1;
+ VM server2InA = vm2;
+ VM server3InA = vm3;
+ List<VM> serversInA = Arrays.asList(server1InA, server2InA, server3InA);
+ final String senderIdInA = "B";
+
+ // Site B
+ VM locatorInB = vm4;
+ VM server1InB = vm5;
+ VM server2InB = vm6;
+ VM server3InB = vm7;
+ List<VM> serversInB = Arrays.asList(server1InB, server2InB, server3InB);
+ VM client = vm8;
+
+ int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+ serversInB, client, usePartitionedRegion, regionName);
+
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // Create senders and receivers with replication as follows: "A" -> "B"
+ if (useParallel) {
+ createReceiverInVMs(server1InB, server2InB, server3InB);
+ createSenders(useParallel, serversInA, null, senderIdInA, null);
+ } else {
+ // Senders will connect to receiver in server1InB
+ server1InB.invoke(WANTestBase::createReceiver);
+ createSenders(useParallel, serversInA, null, senderIdInA, null);
+ createReceiverInVMs(server2InB, server3InB);
+ }
+
+ Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+ String command = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__MAXRATE, "50")
+ .getCommandString();
+ try {
+ gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ return gfsh.executeAndAssertThat(command);
+ };
+
+ Future<CommandResultAssert> wanCopyCommandFuture =
+ executorServiceRule.submit(wanCopyCommandCallable);
+
+ // Wait for the wan-copy command to start
+ waitForWanCopyRegionCommandToStart(useParallel, usePartitionedRegion, serversInA);
+
+ // Stop sender or receiver and verify result
+ if (gwToBeStopped == Gateway.SENDER) {
+ stopSenderAndVerifyResult(useParallel, stopPrimarySender, server2InA, serversInA, senderIdInA,
+ wanCopyCommandFuture);
+ } else if (gwToBeStopped == Gateway.RECEIVER) {
+ stopReceiverAndVerifyResult(useParallel, stopPrimarySender, entries, regionName, server1InB,
+ server2InB, server3InB, wanCopyCommandFuture);
+ }
+ }
+
+ @Test
+ @Parameters({"false, false", "false, true", "true, true"})
+ public void testRegionDestroyedDuringExecution(boolean isParallelGatewaySender,
+ boolean isPartitionedRegion)
+ throws Exception {
+ final int wanCopyRegionBatchSize = 10;
+ final int entries = 1000;
+
+ final String regionName = getRegionName(isPartitionedRegion);
+
+ // Site A
+ VM locatorInA = vm0;
+ VM server1InA = vm1;
+ List<VM> serversInA = ImmutableList.of(server1InA);
+ final String senderIdInA = "B";
+
+ // Site B
+ VM locatorInB = vm4;
+ VM server1InB = vm5;
+ List<VM> serversInB = ImmutableList.of(server1InB);
+ VM client = vm8;
+
+ int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+ serversInB, client, isPartitionedRegion, regionName);
+
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // Create senders and receivers with replication as follows: "A" -> "B"
+ createReceiverInVMs(server1InB);
+ createSenders(isParallelGatewaySender, serversInA, null, senderIdInA, null);
+
+ Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+ String command = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__MAXRATE, "5")
+ .getCommandString();
+ try {
+ gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ return gfsh.executeAndAssertThat(command);
+ };
+
+ Future<CommandResultAssert> wanCopyCommandFuture =
+ executorServiceRule.submit(wanCopyCommandCallable);
+
+ // Wait for the wan-copy command to start
+ waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+ // Destroy region in a server in A
+ server1InA.invoke(() -> cache.getRegion(regionName).destroyRegion());
+
+ CommandResultAssert result = wanCopyCommandFuture.get();
+ result.statusIsError();
+ result
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ result
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(1);
+ result
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(1);
+ result
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactly("ERROR");
+
+ Condition<String> regionDestroyedError;
+ regionDestroyedError = new Condition<>(
+ s -> (s.startsWith(
+ "Execution failed. Error: org.apache.geode.cache.RegionDestroyedException: ")
+ || s.startsWith("Error (Region destroyed) in operation after having copied")),
+ "execution error");
+ result
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, regionDestroyedError);
+ }
+
+ @Test
+ @Parameters({"false, false", "false, true", "true, true"})
+ public void testDetectOngoingExecution(boolean useParallel,
+ boolean usePartitionedRegion)
+ throws Exception {
+ final int wanCopyRegionBatchSize = 10;
+ final int entries = 1000;
+
+ final String regionName = getRegionName(usePartitionedRegion);
+
+ // Site A
+ VM locatorInA = vm0;
+ VM server1InA = vm1;
+ VM server2InA = vm2;
+ VM server3InA = vm3;
+ List<VM> serversInA = Arrays.asList(server1InA, server2InA, server3InA);
+ final String senderIdInA = "B";
+
+ // Site B
+ VM locatorInB = vm4;
+ VM server1InB = vm5;
+ VM server2InB = vm6;
+ VM server3InB = vm7;
+ List<VM> serversInB = Arrays.asList(server1InB, server2InB, server3InB);
+ VM client = vm8;
+
+ int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+ serversInB, client, usePartitionedRegion, regionName);
+
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // Create senders and receivers with replication as follows: "A" -> "B"
+ if (useParallel) {
+ createReceiverInVMs(server1InB, server2InB, server3InB);
+ createSenders(useParallel, serversInA, null, senderIdInA, null);
+ } else {
+ // Senders will connect to receiver in server1InB
+ server1InB.invoke(WANTestBase::createReceiver);
+ createSenders(useParallel, serversInA, null, senderIdInA, null);
+ createReceiverInVMs(server2InB, server3InB);
+ }
+
+ Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+ String command = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__MAXRATE, "5")
+ .getCommandString();
+ try {
+ gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ return gfsh.executeAndAssertThat(command);
+ };
+
+ Future<CommandResultAssert> wanCopyCommandFuture =
+ executorServiceRule.submit(wanCopyCommandCallable);
+
+ // Wait for the wan-copy command to start
+ waitForWanCopyRegionCommandToStart(useParallel, usePartitionedRegion, serversInA);
+
+ // Execute again the same wan-copy region command
+ String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .getCommandString();
+
+ // Check command status and output
+ Condition<String> exceptionError = new Condition<>(
+ s -> s.equals(CliStrings.format(WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+ regionName, senderIdInA)),
+ "already running");
+ if (useParallel) {
+ CommandResultAssert command =
+ verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(3, exceptionError);
+ } else {
+ CommandResultAssert command =
+ verifyStatusIsErrorInOneServer(gfsh.executeAndAssertThat(commandString));
+ Condition<String> senderNotPrimary = new Condition<>(
+ s -> s.equals(CliStrings
+ .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderIdInA)),
+ "sender not primary");
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, exceptionError)
+ .haveExactly(2, senderNotPrimary);
+ }
+
+ // cancel command
+ String commandString1 = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__CANCEL)
+ .getCommandString();
+ gfsh.executeAndAssertThat(commandString1);
+ wanCopyCommandFuture.get();
+ addIgnoredExceptionsForClosingAfterCancelCommand();
+ }
+
+ /**
+ * Scenario with 3 WAN sites: "A", "B" and "C".
+ * Initially, no replication is configured between sites.
+ * Several entries are put in WAN site "A".
+ *
+ * The following gateway senders are created and started:
+ * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+ * - In "B" site: to replicate region entries to "C" site. Sender called "C".
+ * (Replication is as follows: A -> B -> C)
+ *
+ * The "wan-copy region" command is run from "A" site passing sender "B".
+ *
+ * It must be verified that the entries are copied to site "B".
+ * It must also be verified that the entries are not transitively
+ * copied to "C" even though replication is configured from "B" to "C"
+ * because with this command generateCallbacks is set to false in the
+ * events generated.
+ *
+ */
+ @Test
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testSuccessfulExecution(boolean isPartitionedRegion,
+ boolean isParallelGatewaySender) throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ int wanCopyRegionBatchSize = 20;
+ int entries = 100;
+ String regionName = getRegionName(isPartitionedRegion);
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries + 1));
+ // remove an entry to make sure that replication works well even when removes.
+ client.invoke(() -> removeEntry(regionName, entries));
+
+ // Check that entries are put in the region
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // Check that entries are not copied to "B" nor "C"
+ serverInB.invoke(() -> validateRegionSize(regionName, 0));
+ serverInC.invoke(() -> validateRegionSize(regionName, 0));
+
+ // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+ createSenders(isParallelGatewaySender, serversInA, serverInB,
+ senderIdInA, senderIdInB);
+ createReceivers(serverInB, serverInC);
+
+ // Check that entries are not copied to "B" nor "C"
+ serverInB.invoke(() -> validateRegionSize(regionName, 0));
+ serverInC.invoke(() -> validateRegionSize(regionName, 0));
+
+ // Execute wan-copy region command
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .getCommandString();
+
+ // Check command status and output
+ CommandResultAssert command =
+ verifyStatusIsOk(gfsh.executeAndAssertThat(commandString));
+ if (isPartitionedRegion && isParallelGatewaySender) {
+ String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 33);
+ String msg2 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 34);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(msg1, msg1, msg2);
+ } else {
+ String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 100);
+ String msg2 = CliStrings
+ .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY, senderIdInA);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(msg1, msg2, msg2);
+ }
+
+ // Check that entries are copied in "B"
+ serverInB.invoke(() -> validateRegionSize(regionName, entries));
+
+ // Check that the region's data is the same in sites "A" and "B"
+ checkEqualRegionData(regionName, serversInA.get(0), serverInB);
+
+ // Check that wanCopyRegionBatchSize is correctly used by the command
+ long receivedBatches = serverInB.invoke(() -> getReceiverStats().get(2));
+ if (isPartitionedRegion && isParallelGatewaySender) {
+ assertThat(receivedBatches).isEqualTo(6);
+ } else {
+ assertThat(receivedBatches).isEqualTo(5);
+ }
+
+ // Check that entries are not copied in "C" (generateCallbacks is false)
+ serverInC.invoke(() -> validateRegionSize(regionName, 0));
+ }
+
+ /**
+ * Scenario with 2 WAN sites: "A" and "B".
+ * Initially, no replication is configured between sites.
+ * Several entries are put in WAN site "A".
+ *
+ * The following gateway senders are created and started:
+ * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+ * (Replication is as follows: A -> B)
+ *
+ * The "wan-copy region" command is run from "A" site passing sender "B".
+ * Simultaneously, random operations for entries with the same
+ * keys as the one previously put are run.
+ *
+ * When the command finishes and the puts finish it
+ * must be verified that the entries in the region in site "A"
+ * are the same as the ones in region in site "B".
+ */
+ @Test
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testSuccessfulExecutionWhileRunningOpsOnRegion(
+ boolean isPartitionedRegion,
+ boolean isParallelGatewaySender) throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ int wanCopyRegionBatchSize = 20;
+ int entries = 10000;
+ Set<Long> keySet = LongStream.range(0L, entries).boxed().collect(Collectors.toSet());
+ String regionName = getRegionName(isPartitionedRegion);
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+ // Check that entries are put in the region
+ for (VM member : serversInA) {
+ member.invoke(() -> validateRegionSize(regionName, entries));
+ }
+
+ // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+ createSenders(isParallelGatewaySender, serversInA, serverInB,
+ senderIdInA, senderIdInB);
+ createReceivers(serverInB, serverInC);
+
+ // Let maxRate be a 5th of the number of entries in order to have the
+ // command running for about 5 seconds so that there is time
+ // for the random operations to be done at the same time
+ // as the command is running.
+ // The rate is divided by the number of servers in case the sender is parallel
+ int maxRate = (entries / 5) / (isParallelGatewaySender ? serversInA.size() : 1);
+
+ // Execute wan-copy region command
+ Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+ String command = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__MAXRATE, String.valueOf(maxRate))
+ .getCommandString();
+ try {
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ return gfsh.executeAndAssertThat(command);
+ };
+
+ Future<CommandResultAssert> wanCopyCommandFuture =
+ executorServiceRule.submit(wanCopyCommandCallable);
+
+ // Wait for the wan-copy command to start
+ waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+ // While the command is running, send some random operations over the same keys
+ AsyncInvocation<Void> asyncOps1 =
+ client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+ AsyncInvocation<Void> asyncOps2 =
+ client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+ AsyncInvocation<Void> asyncOps3 =
+ client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+
+ // Check command status and output
+ CommandResultAssert command = wanCopyCommandFuture.get();
+ verifyStatusIsOk(command);
+
+ // Wait for random operations to finish
+ asyncOps1.await();
+ asyncOps2.await();
+ asyncOps3.await();
+
+ // Wait for entries to be replicated (replication queues empty)
+ for (VM server : serversInA) {
+ server.invoke(() -> getSenderStats(senderIdInA, 0));
+ }
+
+ // Check that the region's data is the same in sites "A" and "B"
+ checkEqualRegionData(regionName, serversInA.get(0), serverInB);
+ }
+
+ /**
+ * Cancel is executed when no wan-copy region command is running.
+ */
+ @Test
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testUnsuccessfulCancelExecution(boolean isPartitionedRegion,
+ boolean isParallelGatewaySender) throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ String regionName = getRegionName(isPartitionedRegion);
+
+ // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+ createSenders(isParallelGatewaySender, serversInA, serverInB,
+ senderIdInA, senderIdInB);
+ createReceivers(serverInB, serverInC);
+
+ // Execute cancel wan-copy region command
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String cancelCommand = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__CANCEL)
+ .getCommandString();
+
+ CommandResultAssert cancelCommandResult =
+ verifyStatusIsError(gfsh.executeAndAssertThat(cancelCommand));
+ String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+ regionName, senderIdInA);
+ cancelCommandResult
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(msg1, msg1, msg1);
+ }
+
+ /**
+ * Scenario with 3 WAN sites: "A", "B" and "C".
+ * Initially, no replication is configured between sites.
+ * Several entries are put in WAN site "A".
+ *
+ * The following gateway senders are created and started:
+ * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+ * - In "B" site: to replicate region entries to "C" site. Sender called "C".
+ * (Replication is as follows: A -> B -> C)
+ *
+ * The "wan-copy region" command is run from "A" site passing sender "B"
+ * in a different thread. The maxRate is set to a very low value so that there is
+ * time to cancel it before it finishes.
+ *
+ * The "wan-copy region" command with the cancel option
+ * is run from "A" site passing sender "B".
+ *
+ * It must be verified that the command is canceled.
+ * Also, the output of the command must show
+ * the number of entries copied before the command was canceled.
+ */
+ @Test
+ @Parameters({"true, true", "true, false", "false, false"})
+ public void testSuccessfulCancelExecution(boolean isPartitionedRegion,
+ boolean isParallelGatewaySender) throws Exception {
+ List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+ VM serverInB = vm3;
+ VM serverInC = vm4;
+ VM client = vm8;
+ String senderIdInA = "B";
+ String senderIdInB = "C";
+
+ int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+ vm1, vm2, serversInA, serverInB, serverInC, client,
+ senderIdInA, senderIdInB);
+
+ int wanCopyRegionBatchSize = 20;
+ int entries = 100;
+ String regionName = getRegionName(isPartitionedRegion);
+ // Put entries
+ client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+ // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+ createSenders(isParallelGatewaySender, serversInA, serverInB,
+ senderIdInA, senderIdInB);
+ createReceivers(serverInB, serverInC);
+
+ Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+ String command = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+ .addOption(WAN_COPY_REGION__MAXRATE, String.valueOf(1))
+ .getCommandString();
+ try {
+ gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ return gfsh.executeAndAssertThat(command).statusIsError();
+ };
+
+ Future<CommandResultAssert> wanCopyCommandFuture =
+ executorServiceRule.submit(wanCopyCommandCallable);
+
+ // Wait for the wan-copy command to start
+ waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+ // Cancel wan-copy region command
+ GfshCommandRule gfshCancelCommand = new GfshCommandRule();
+ gfshCancelCommand.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+ String cancelCommand = new CommandStringBuilder(WAN_COPY_REGION)
+ .addOption(WAN_COPY_REGION__REGION, regionName)
+ .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+ .addOption(WAN_COPY_REGION__CANCEL)
+ .getCommandString();
+ CommandResultAssert cancelCommandResult =
+ gfshCancelCommand.executeAndAssertThat(cancelCommand);
+
+ if (isPartitionedRegion && isParallelGatewaySender) {
+ verifyStatusIsOk(cancelCommandResult);
+ cancelCommandResult
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(WAN_COPY_REGION__MSG__EXECUTION__CANCELED,
+ WAN_COPY_REGION__MSG__EXECUTION__CANCELED,
+ WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } else {
+ verifyStatusIsOkInOneServer(cancelCommandResult);
+ String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+ regionName, senderIdInA);
+ cancelCommandResult
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(msg1, msg1, WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ }
+
+ // Check wan-copy region command output
+ CommandResultAssert wanCopyCommandResult = wanCopyCommandFuture.get();
+ if (isPartitionedRegion && isParallelGatewaySender) {
+ verifyStatusIsError(wanCopyCommandResult);
+ String msg = WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+ wanCopyCommandResult
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactly(msg, msg, msg);
+ } else {
+ verifyStatusIsErrorInOneServer(wanCopyCommandResult);
+ String msg1 = CliStrings
+ .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY, senderIdInA);
+ wanCopyCommandResult
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED, msg1,
+ msg1);
+ }
+ addIgnoredExceptionsForClosingAfterCancelCommand();
+ }
+
+ private int create3WanSitesAndClient(boolean isPartitionedRegion, VM locatorSender,
+ VM locatorSenderReceiver, VM locatorReceiver, List<VM> serversInA, VM serverInB,
+ VM serverInC, VM client, String senderIdInA, String senderIdInB) {
+ // Create locators
+ int receiverLocatorPort =
+ locatorReceiver.invoke(() -> createFirstLocatorWithDSId(3));
+ int senderReceiverLocatorPort = locatorSenderReceiver
+ .invoke(() -> createFirstRemoteLocator(2, receiverLocatorPort));
+ int senderLocatorPort = locatorSender.invoke(() -> {
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + 1);
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + senderReceiverLocatorPort + "]");
+ LocatorLauncherStartupRule launcherStartupRule =
+ new LocatorLauncherStartupRule().withProperties(props);
+ launcherStartupRule.start();
+ return launcherStartupRule.getLauncher().getPort();
+ });
+
+ // Create servers
+ Properties properties = new Properties();
+ serverInB.invoke(() -> createServer(senderReceiverLocatorPort, -1, properties));
+ serverInC.invoke(() -> createServer(receiverLocatorPort, -1, properties));
+ for (VM server : serversInA) {
+ server.invoke(() -> createServer(senderLocatorPort, -1, properties));
+ }
+
+ // Create region in servers
+ final String regionName = getRegionName(isPartitionedRegion);
+ if (isPartitionedRegion) {
+ for (VM server : serversInA) {
+ server
+ .invoke(() -> createPartitionedRegion(regionName, senderIdInA, 1, 100,
+ isOffHeap(), RegionShortcut.PARTITION, true));
+ }
+ serverInB.invoke(
+ () -> createPartitionedRegion(regionName, senderIdInB, 0, 100,
+ isOffHeap(), RegionShortcut.PARTITION, true));
+ serverInC.invoke(() -> createPartitionedRegion(regionName, null, 0, 100,
+ isOffHeap(), RegionShortcut.PARTITION, true));
+ } else {
+ for (VM server : serversInA) {
+ server.invoke(() -> createReplicatedRegion(regionName, senderIdInA,
+ Scope.GLOBAL, DataPolicy.REPLICATE,
+ isOffHeap(), true));
+ }
+ serverInB
+ .invoke(() -> createReplicatedRegion(regionName, senderIdInB,
+ Scope.GLOBAL, DataPolicy.REPLICATE,
+ isOffHeap(), true));
+ serverInC.invoke(() -> createReplicatedRegion(regionName, null,
+ Scope.GLOBAL, DataPolicy.REPLICATE, isOffHeap(), true));
+ }
+
+ // Create client
+ client.invoke(() -> createClientWithLocatorAndRegion(senderLocatorPort, "localhost",
+ regionName, ClientRegionShortcut.PROXY));
+
+ return senderLocatorPort;
+ }
+
+ private int create2WanSitesAndClient(VM locatorInA, List<VM> serversInA, String senderIdInA,
+ VM locatorInB, List<VM> serversInB, VM client, boolean usePartitionedRegion,
+ String regionName) {
+ // Create locators
+ int locatorBPort = locatorInB.invoke(() -> createFirstLocatorWithDSId(2));
+ int locatorAPort = locatorInA.invoke(() -> {
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + 1);
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + locatorBPort + "]");
+ LocatorLauncherStartupRule launcherStartupRule =
+ new LocatorLauncherStartupRule().withProperties(props);
+ launcherStartupRule.start();
+ return launcherStartupRule.getLauncher().getPort();
+ });
+
+ // Create servers and regions
+ createServersAndRegions(locatorBPort, serversInB, usePartitionedRegion, regionName, null);
+ createServersAndRegions(locatorAPort, serversInA, usePartitionedRegion, regionName,
+ senderIdInA);
+
+ // Create client
+ client.invoke(() -> createClientWithLocatorAndRegion(locatorAPort, "localhost",
+ regionName, ClientRegionShortcut.PROXY));
+
+ return locatorAPort;
+ }
+
+ private void createSenders(boolean isParallelGatewaySender, List<VM> serversInA,
+ VM serverInB, String senderIdInA, String senderIdInB) {
+ if (serverInB != null && senderIdInB != null) {
+ serverInB.invoke(() -> createSender(senderIdInB, 3,
+ isParallelGatewaySender, 100, 10, false,
+ false, null, false));
+ }
+ for (VM server : serversInA) {
+ server.invoke(() -> createSender(senderIdInA, 2, isParallelGatewaySender,
+ 100, 10, false,
+ false, null, true));
+ }
+ startSenderInVMsAsync(senderIdInA, serversInA.toArray(new VM[0]));
+ }
+
+ private void createReceivers(VM serverInB, VM serverInC) {
+ createReceiverInVMs(serverInB);
+ createReceiverInVMs(serverInC);
+ }
+
+ private void stopReceiverAndVerifyResult(boolean useParallel, boolean stopPrimarySender,
+ int entries, String regionName, VM server1InB, VM server2InB, VM server3InB,
+ Future<CommandResultAssert> commandFuture)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ // if parallel sender: stop any receiver
+ // if serial sender: stop receiver connected to primary or secondary
+ if (useParallel) {
+ server2InB.invoke(() -> cache.close());
+ } else {
+ // Region type has no influence on which server should be stopped
+ if (stopPrimarySender) {
+ // Stop the first server which had an available receiver
+ server1InB.invoke(() -> cache.close());
+ } else {
+ server3InB.invoke(() -> cache.close());
+ }
+ }
+
+ CommandResultAssert result = commandFuture.get();
+ // Verify result
+ if (useParallel) {
+ verifyResultOfStoppingReceiverWhenUsingParallelSender(result);
+ } else {
+ verifyResultOfStoppingReceiverWhenUsingSerialSender(result);
+ server2InB.invoke(() -> validateRegionSize(regionName, entries));
+ }
+ }
+
+ private void stopSenderAndVerifyResult(boolean useParallel, boolean stopPrimarySender,
+ VM server2InA, List<VM> serversInA, String senderIdInA,
+ Future<CommandResultAssert> wanCopyCommandFuture)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ // If parallel: stop any server
+ // If serial: stop primary or secondary
+ if (useParallel) {
+ server2InA.invoke(() -> killSender(senderIdInA));
+ } else {
+ for (VM server : serversInA) {
+ boolean senderWasStopped = server.invoke(() -> {
+ GatewaySender sender = cache.getGatewaySender(senderIdInA);
+ if (((InternalGatewaySender) sender).isPrimary() == stopPrimarySender) {
+ killSender();
+ return true;
+ }
+ return false;
+ });
+ if (senderWasStopped) {
+ break;
+ }
+ }
+ }
+
+ CommandResultAssert result = wanCopyCommandFuture.get();
+ // Verify result
+ if (useParallel) {
+ verifyResultOfStoppingParallelSender(result);
+ } else {
+ if (stopPrimarySender) {
+ verifyResultOfStoppingPrimarySerialSender(result);
+ } else {
+ verifyResultStoppingSecondarySerialSender(result);
+ }
+ }
+ }
+
+ public void verifyResultOfStoppingReceiverWhenUsingSerialSender(
+ CommandResultAssert command) {
+ verifyStatusIsOk(command);
+ Condition<String> haveEntriesCopied =
+ new Condition<>(s -> s.startsWith("Entries copied: "), "Entries copied");
+ Condition<String> senderNotPrimary = new Condition<>(
+ s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+ "sender not primary");
+
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, haveEntriesCopied)
+ .haveExactly(2, senderNotPrimary);
+ }
+
+ public void verifyResultOfStoppingReceiverWhenUsingParallelSender(
+ CommandResultAssert command) {
+ verifyStatusIsOk(command);
+ Condition<String> haveEntriesCopied =
+ new Condition<>(s -> s.startsWith("Entries copied: "), "Entries copied");
+
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(3, haveEntriesCopied);
+ }
+
+ public void verifyResultOfStoppingParallelSender(CommandResultAssert command) {
+ verifyStatusIsErrorInOneServer(command);
+ Condition<String> startsWithError = new Condition<>(
+ s -> (s.startsWith("Execution failed. Error:")
+ || s.startsWith("Error (Unknown error sending batch)")
+ || s.startsWith("Error (Region destroyed)")
+ || s.startsWith("MemberResponse got memberDeparted event for")
+ || s.equals(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED)),
+ "execution error");
+ Condition<String> haveEntriesCopied =
+ new Condition<>(s -> s.startsWith("Entries copied:"), "Entries copied");
+
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, startsWithError)
+ .haveExactly(2, haveEntriesCopied);
+ }
+
+ public void verifyResultOfStoppingPrimarySerialSender(
+ CommandResultAssert command) {
+ verifyStatusIsErrorInOneServer(command);
+
+ Condition<String> startsWithError = new Condition<>(
+ s -> (s.startsWith("Execution failed. Error:")
+ || s.startsWith("Error (Unknown error sending batch)")
+ || s.startsWith("No connection available towards receiver after having copied")
+ || s.startsWith("Error (Region destroyed)")
+ || s.startsWith("MemberResponse got memberDeparted event for")
+ || s.equals(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED)),
+ "execution error");
+
+ Condition<String> senderNotPrimary = new Condition<>(
+ s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+ "sender not primary");
+
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, startsWithError)
+ .haveExactly(2, senderNotPrimary);
+ }
+
+ public void verifyResultStoppingSecondarySerialSender(
+ CommandResultAssert command) {
+ command.statusIsSuccess();
+ command
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactlyInAnyOrder("OK", "OK", "OK");
+
+ Condition<String> haveEntriesCopied =
+ new Condition<>(s -> s.startsWith("Entries copied:"), "Entries copied");
+ Condition<String> senderNotPrimary = new Condition<>(
+ s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+ "sender not primary");
+
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .asList()
+ .haveExactly(1, haveEntriesCopied)
+ .haveExactly(2, senderNotPrimary);
+ }
+
+ private String getRegionName(boolean isPartitionedRegion) {
+ return getTestMethodName() + (isPartitionedRegion ? "_PR" : "RR");
+ }
+
+ public static void removeEntry(String regionName, long key) {
+ Region<?, ?> region = cache.getRegion(SEPARATOR + regionName);
+ assertNotNull(region);
+ region.remove(key);
+ }
+
+ public void sendRandomOpsFromClient(String regionName, Set<Long> keySet, int iterations) {
+ Region<Long, Integer> region = cache.getRegion(SEPARATOR + regionName);
+ assertNotNull(region);
+ int min = 0;
+ int max = 1000;
+ for (int i = 0; i < iterations; i++) {
+ for (Long key : keySet) {
+ long longKey = key;
+ int value = (int) (Math.random() * (max - min + 1) + min);
+ if (value < 50) {
+ region.remove(longKey);
+ } else {
+ region.put(longKey, value);
+ }
+ }
+ }
+ }
+
+ public CommandResultAssert verifyStatusIsOk(CommandResultAssert command) {
+ command.statusIsSuccess();
+ command
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactly("OK", "OK", "OK");
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(3);
+ return command;
+ }
+
+ public CommandResultAssert verifyStatusIsError(CommandResultAssert command) {
+ command.statusIsError();
+ command
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactly("ERROR", "ERROR", "ERROR");
+ return command;
+ }
+
+ public CommandResultAssert verifyStatusIsErrorInOneServer(
+ CommandResultAssert command) {
+ command.statusIsError();
+ command
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactlyInAnyOrder("OK", "OK", "ERROR");
+ return command;
+ }
+
+ public void verifyStatusIsOkInOneServer(
+ CommandResultAssert command) {
+ command.statusIsError();
+ command
+ .hasTableSection()
+ .hasColumns()
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Member")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Message")
+ .hasSize(3);
+ command
+ .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+ .hasColumn("Status")
+ .containsExactlyInAnyOrder("OK", "ERROR", "ERROR");
+ }
+
+ public void createServersAndRegions(int locatorPort, List<VM> servers,
+ boolean usePartitionedRegion, String regionName, String senderId) {
+
+ Properties properties = new Properties();
+ for (VM server : servers) {
+ server.invoke(() -> createServer(locatorPort, -1, properties));
+ if (usePartitionedRegion) {
+ server
+ .invoke(() -> createPartitionedRegion(regionName, senderId, 1, 100,
+ isOffHeap(), RegionShortcut.PARTITION, true));
+ } else {
+ server.invoke(() -> createReplicatedRegion(regionName, senderId,
+ Scope.GLOBAL, DataPolicy.REPLICATE,
+ isOffHeap(), true));
+ }
+ }
+ }
+
+ private void waitForWanCopyRegionCommandToStart(boolean useParallel, boolean usePartitionedRegion,
+ List<VM> servers) {
+ final int executionsExpected = useParallel && usePartitionedRegion ? servers.size() : 1;
+ await().untilAsserted(
+ () -> assertThat(getNumberOfCurrentExecutionsInServers(servers))
+ .isEqualTo(executionsExpected));
+ }
+
+ private void addIgnoredExceptionsForClosingAfterCancelCommand() {
+ addIgnoredException(
+ "Error closing the connection used to wan-copy region entries");
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+ }
+
+ private void addIgnoredExceptionsForSenderInUseWentDown() {
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+ addIgnoredException("DistributedSystemDisconnectedException");
+ addIgnoredException("org.apache.geode.distributed.PoolCancelledException");
+ addIgnoredException(
+ "Exception when running wan-copy region command: ");
+ addIgnoredException(
+ "Exception when running wan-copy region command: java.util.concurrent.ExecutionException: org.apache.geode.cache.EntryDestroyedException");
+ addIgnoredException(
+ "Error closing the connection used to wan-copy region entries");
+ }
+
+ private void addIgnoredExceptionsForReceiverConnectedToSenderInUseWentDown() {
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+ addIgnoredException(
+ "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+ addIgnoredException("DistributedSystemDisconnectedException");
+ }
+
+ private int getNumberOfCurrentExecutionsInServers(List<VM> vmList) {
+ return vmList.stream()
+ .map((vm) -> vm.invoke(() -> ((InternalCache) cache)
+ .getService(WanCopyRegionFunctionService.class).getNumberOfCurrentExecutions()))
+ .reduce(0, Integer::sum);
+ }
+}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 9c4c6a6..7c9bc37 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -107,6 +107,9 @@ import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.ConnectionStats;
@@ -429,8 +432,7 @@ public class WANTestBase extends DistributedTestCase {
}
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -438,7 +440,6 @@ public class WANTestBase extends DistributedTestCase {
}
}
-
public static void createReplicatedProxyRegion(String regionName, String senderIds,
Boolean offHeap) {
IgnoredException exp =
@@ -458,8 +459,7 @@ public class WANTestBase extends DistributedTestCase {
}
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -477,8 +477,7 @@ public class WANTestBase extends DistributedTestCase {
}
}
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
}
public static void createPersistentReplicatedRegion(String regionName, String senderIds,
@@ -493,8 +492,7 @@ public class WANTestBase extends DistributedTestCase {
}
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
}
public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
@@ -512,8 +510,7 @@ public class WANTestBase extends DistributedTestCase {
}
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp1.remove();
}
@@ -535,8 +532,7 @@ public class WANTestBase extends DistributedTestCase {
fact.setOffHeap(offHeap);
fact.addAsyncEventQueueId(asyncChannelId);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
}
@@ -544,6 +540,11 @@ public class WANTestBase extends DistributedTestCase {
public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
DataPolicy policy, Boolean offHeap) {
+ createReplicatedRegion(regionName, senderIds, scope, policy, offHeap, false);
+ }
+
+ public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
+ DataPolicy policy, Boolean offHeap, boolean statisticsEnabled) {
RegionFactory fact = cache.createRegionFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
@@ -556,8 +557,8 @@ public class WANTestBase extends DistributedTestCase {
fact.setDataPolicy(policy);
fact.setScope(scope);
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.setStatisticsEnabled(statisticsEnabled);
+ fact.create(regionName);
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
@@ -597,6 +598,13 @@ public class WANTestBase extends DistributedTestCase {
public static void createPartitionedRegion(String regionName, String senderIds,
Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut) {
+ createPartitionedRegion(regionName, senderIds, redundantCopies, totalNumBuckets, offHeap,
+ shortcut, false);
+ }
+
+ public static void createPartitionedRegion(String regionName, String senderIds,
+ Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut,
+ boolean statisticsEnabled) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
@@ -616,8 +624,8 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRecoveryDelay(0);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.setStatisticsEnabled(statisticsEnabled);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -645,8 +653,7 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRedundantCopies(redundantCopies);
pfact.setRecoveryDelay(0);
fact.setPartitionAttributes(pfact.create());
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -675,8 +682,7 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRecoveryDelay(0);
pfact.setColocatedWith(colocatedWith);
fact.setPartitionAttributes(pfact.create());
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -684,15 +690,13 @@ public class WANTestBase extends DistributedTestCase {
}
public static void addSenderThroughAttributesMutator(String regionName, String senderIds) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
AttributesMutator mutator = r.getAttributesMutator();
mutator.addGatewaySenderId(senderIds);
}
public static void addAsyncEventQueueThroughAttributesMutator(String regionName, String queueId) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
AttributesMutator mutator = r.getAttributesMutator();
mutator.addAsyncEventQueueId(queueId);
}
@@ -711,8 +715,7 @@ public class WANTestBase extends DistributedTestCase {
pfact.setTotalNumBuckets(totalNumBuckets);
pfact.setRedundantCopies(redundantCopies);
fact.setPartitionAttributes(pfact.create());
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
}
public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName,
@@ -736,8 +739,7 @@ public class WANTestBase extends DistributedTestCase {
pfact.setColocatedWith(colocatedWith);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
}
public static void createPersistentPartitionedRegion(String regionName, String senderIds,
@@ -762,8 +764,7 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRedundantCopies(redundantCopies);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ fact.create(regionName);
} finally {
exp.remove();
exp1.remove();
@@ -791,7 +792,6 @@ public class WANTestBase extends DistributedTestCase {
fact.setOffHeap(offHeap);
customerRegion =
(PartitionedRegion) fact.create(customerRegionName);
- assertNotNull(customerRegion);
logger.info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString());
paf = new PartitionAttributesFactory();
@@ -810,7 +810,6 @@ public class WANTestBase extends DistributedTestCase {
fact.setOffHeap(offHeap);
orderRegion =
(PartitionedRegion) fact.create(orderRegionName);
- assertNotNull(orderRegion);
logger.info("Partitioned Region ORDER created Successfully :" + orderRegion.toString());
paf = new PartitionAttributesFactory();
@@ -829,7 +828,6 @@ public class WANTestBase extends DistributedTestCase {
fact.setOffHeap(offHeap);
shipmentRegion =
(PartitionedRegion) fact.create(shipmentRegionName);
- assertNotNull(shipmentRegion);
logger.info("Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString());
} finally {
exp.remove();
@@ -851,17 +849,14 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRedundantCopies(redundantCopies);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ Region<?, ?> r = fact.create(regionName);
pfact.setColocatedWith(r.getName());
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r1 = fact.create(regionName + "_child1");
- assertNotNull(r1);
+ fact.create(regionName + "_child1");
- Region r2 = fact.create(regionName + "_child2");
- assertNotNull(r2);
+ fact.create(regionName + "_child2");
}
public static void createColocatedPartitionedRegions2(String regionName, String senderIds,
@@ -879,18 +874,15 @@ public class WANTestBase extends DistributedTestCase {
pfact.setRedundantCopies(redundantCopies);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r = fact.create(regionName);
- assertNotNull(r);
+ Region<?, ?> r = fact.create(regionName);
fact = cache.createRegionFactory(RegionShortcut.PARTITION);
pfact.setColocatedWith(r.getName());
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
- Region r1 = fact.create(regionName + "_child1");
- assertNotNull(r1);
+ fact.create(regionName + "_child1");
- Region r2 = fact.create(regionName + "_child2");
- assertNotNull(r2);
+ fact.create(regionName + "_child2");
}
public static void createCacheInVMs(Integer locatorPort, VM... vms) {
@@ -1033,7 +1025,6 @@ public class WANTestBase extends DistributedTestCase {
*/
public static Integer createCacheServer() {
CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
server1.setPort(0);
try {
server1.start();
@@ -1192,7 +1183,6 @@ public class WANTestBase extends DistributedTestCase {
BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
.send((InternalDistributedMember) destination, region, bucketId, true);
- assertNotNull(response);
assertTrue(response.waitForResponse());
}
@@ -1214,7 +1204,6 @@ public class WANTestBase extends DistributedTestCase {
public static void checkConnectionStats(String senderId) {
AbstractGatewaySender sender =
(AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
- assertNotNull(sender);
Collection statsCollection = sender.getProxy().getEndpointManager().getAllStats().values();
assertTrue(statsCollection.iterator().next() instanceof ConnectionStats);
@@ -1258,7 +1247,6 @@ public class WANTestBase extends DistributedTestCase {
public static int getGatewaySenderPoolDisconnects(String senderId) {
AbstractGatewaySender sender =
(AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
- assertNotNull(sender);
PoolStats poolStats = sender.getProxy().getStats();
@@ -1320,6 +1308,23 @@ public class WANTestBase extends DistributedTestCase {
assertEquals(creates, gatewayReceiverStats.getCreateRequest());
}
+ public static List<Long> getReceiverStats() {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+ assertTrue(stats instanceof GatewayReceiverStats);
+ GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
+ List<Long> statsList = new ArrayList<>();
+ statsList.add(gatewayReceiverStats.getEventsReceived());
+ statsList.add(gatewayReceiverStats.getEventsRetried());
+ statsList.add(gatewayReceiverStats.getProcessBatchRequests());
+ statsList.add(gatewayReceiverStats.getDuplicateBatchesReceived());
+ statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
+ statsList.add(gatewayReceiverStats.getEarlyAcks());
+ statsList.add(gatewayReceiverStats.getExceptionsOccurred());
+ return statsList;
+ }
+
public static void checkMinimumGatewayReceiverStats(int processBatches, int eventsReceived) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
GatewayReceiver receiver = gatewayReceivers.iterator().next();
@@ -1584,7 +1589,7 @@ public class WANTestBase extends DistributedTestCase {
}
private void addCacheListenerOnRegion(String regionName) {
- Region region = cache.getRegion(regionName);
+ Region<?, ?> region = cache.getRegion(regionName);
AttributesMutator mutator = region.getAttributesMutator();
listener1 = new QueueListener();
mutator.addCacheListener(listener1);
@@ -2163,7 +2168,12 @@ public class WANTestBase extends DistributedTestCase {
public static int createServer(int locPort, int maximumTimeBetweenPings) {
WANTestBase test = new WANTestBase();
- Properties props = test.getDistributedSystemProperties();
+ Properties properties = test.getDistributedSystemProperties();
+ return createServer(locPort, maximumTimeBetweenPings, properties);
+ }
+
+ public static int createServer(int locPort, int maximumTimeBetweenPings, Properties props) {
+ WANTestBase test = new WANTestBase();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
@@ -2184,19 +2194,26 @@ public class WANTestBase extends DistributedTestCase {
return port;
}
- public static void createClientWithLocator(int port0, String host, String regionName) {
- createClientWithLocator(port0, host);
+ public static void createClientWithLocatorAndRegion(int port0, String host, String regionName,
+ ClientRegionShortcut regionType) {
+ cache = (Cache) new ClientCacheFactory().addPoolLocator(host, port0).create();
+
+ ((ClientCache) cache).createClientRegionFactory(regionType)
+ .create(regionName);
+ }
+
+ public static void createClientWithLocatorAndRegion(int port0, String host, String regionName) {
+ createClientWithLocatorAndRegion(port0, host);
RegionFactory factory = cache.createRegionFactory(RegionShortcut.LOCAL);
factory.setPoolName("pool");
region = factory.create(regionName);
region.registerInterest("ALL_KEYS");
- assertNotNull(region);
logger.info("Distributed Region " + regionName + " created Successfully :" + region.toString());
}
- public static void createClientWithLocator(final int port0, final String host) {
+ public static void createClientWithLocatorAndRegion(final int port0, final String host) {
WANTestBase test = new WANTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
@@ -2258,8 +2275,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 1; i <= numPuts; i++) {
txMgr.begin();
r.put(i, i);
@@ -2278,8 +2294,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
r.put(i, value);
}
@@ -2295,8 +2310,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
r.put(i, "Value_" + i);
}
@@ -2312,7 +2326,6 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException ignored1 =
IgnoredException.addIgnoredException(GatewaySenderException.class)) {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
for (long i = 0; i < numPuts; i++) {
cache.getCacheTransactionManager().begin();
r.put(i, "Value_" + i);
@@ -2327,8 +2340,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
r.put(key, "Value_" + i);
}
@@ -2340,32 +2352,28 @@ public class WANTestBase extends DistributedTestCase {
public static void doPutsAfter300(String regionName, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 300; i < numPuts; i++) {
r.put(i, "Value_" + i);
}
}
public static void doPutsFrom(String regionName, int from, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = from; i < numPuts; i++) {
r.put(i, "Value_" + i);
}
}
public static void doDestroys(String regionName, int keyNum) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < keyNum; i++) {
r.destroy(i);
}
}
public static void doPutAll(String regionName, int numPuts, int size) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
Map putAllMap = new HashMap();
for (long j = 0; j < size; j++) {
@@ -2378,16 +2386,14 @@ public class WANTestBase extends DistributedTestCase {
public static void doPutsWithKeyAsString(String regionName, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<String, String> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
r.put("Object_" + i, "Value_" + i);
}
}
public static void putGivenKeyValue(String regionName, Map keyValues) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (Object key : keyValues.keySet()) {
r.put(key, keyValues.get(key));
}
@@ -2397,8 +2403,6 @@ public class WANTestBase extends DistributedTestCase {
int eventsPerTransaction) {
Region orderRegion = cache.getRegion(orderRegionName);
Region shipmentRegion = cache.getRegion(shipmentRegionName);
- assertNotNull(orderRegion);
- assertNotNull(shipmentRegion);
int eventInTransaction = 0;
CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
for (Object key : keyValues.keySet()) {
@@ -2424,8 +2428,7 @@ public class WANTestBase extends DistributedTestCase {
public static void doPutsInsideTransactions(String regionName, Map keyValues,
int eventsPerTransaction) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);
int eventInTransaction = 0;
CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
for (Object key : keyValues.keySet()) {
@@ -2448,8 +2451,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void destroyRegion(String regionName, final int min) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
await().until(() -> r.size() > min);
r.destroyRegion();
}
@@ -2458,8 +2460,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp =
IgnoredException.addIgnoredException(PRLocallyDestroyedException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
r.localDestroyRegion();
} finally {
exp.remove();
@@ -2478,8 +2479,6 @@ public class WANTestBase extends DistributedTestCase {
}
protected static Map putCustomerPartitionedRegion(int numPuts, String valueSuffix) {
- assertNotNull(cache);
- assertNotNull(customerRegion);
Map custKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2500,8 +2499,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map putOrderPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(orderRegion);
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2525,8 +2522,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map putOrderPartitionedRegionUsingCustId(int numPuts) {
- assertNotNull(cache);
- assertNotNull(orderRegion);
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2548,8 +2543,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map updateOrderPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(orderRegion);
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2573,8 +2566,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map updateOrderPartitionedRegionUsingCustId(int numPuts) {
- assertNotNull(cache);
- assertNotNull(orderRegion);
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2595,8 +2586,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map putShipmentPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(shipmentRegion);
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2621,10 +2610,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static void putcolocatedPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(customerRegion);
- assertNotNull(orderRegion);
- assertNotNull(shipmentRegion);
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
Customer customer = new Customer("Customer" + custid, "Address" + custid);
@@ -2641,8 +2626,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map putShipmentPartitionedRegionUsingCustId(int numPuts) {
- assertNotNull(cache);
- assertNotNull(shipmentRegion);
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2663,8 +2646,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map updateShipmentPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(shipmentRegion);
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2689,8 +2670,6 @@ public class WANTestBase extends DistributedTestCase {
}
public static Map updateShipmentPartitionedRegionUsingCustId(int numPuts) {
- assertNotNull(cache);
- assertNotNull(shipmentRegion);
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
@@ -2711,16 +2690,14 @@ public class WANTestBase extends DistributedTestCase {
}
public static void doPutsPDXSerializable(String regionName, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (int i = 0; i < numPuts; i++) {
r.put("Key_" + i, new SimpleClass(i, (byte) i));
}
}
public static void doPutsPDXSerializable2(String regionName, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (int i = 0; i < numPuts; i++) {
r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i, "" + i, "" + i, i, i));
}
@@ -2728,8 +2705,7 @@ public class WANTestBase extends DistributedTestCase {
public static void doTxPuts(String regionName) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
CacheTransactionManager mgr = cache.getCacheTransactionManager();
mgr.begin();
@@ -2741,8 +2717,7 @@ public class WANTestBase extends DistributedTestCase {
public static void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
final long transactions, long offset) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);
long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
long j = 0;
@@ -2786,8 +2761,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp =
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
try {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = start; i < numPuts; i++) {
r.put(i, i);
}
@@ -2889,8 +2863,7 @@ public class WANTestBase extends DistributedTestCase {
}
});
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
for (long i = 0; i < 5; i++) {
@@ -2916,8 +2889,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp1 =
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
try {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
if (regionSize != r.keySet().size()) {
await()
.untilAsserted(() -> assertEquals(
@@ -2931,6 +2903,35 @@ public class WANTestBase extends DistributedTestCase {
}
}
+ public List<Object> getKeys(String regionName) {
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+ return new ArrayList<>(r.keySet());
+ }
+
+ public void checkEqualRegionData(String regionName, VM vm1, VM vm2) {
+ assertThat(vm1.invoke(() -> getRegionSize(regionName)))
+ .isEqualTo(vm2.invoke(() -> getRegionSize(regionName)));
+ for (Object key : vm1.invoke(() -> getKeys(regionName))) {
+ assertThat(vm1.invoke(() -> getValueForEntry((long) key, regionName)))
+ .isEqualTo(vm2.invoke(() -> getValueForEntry((long) key, regionName)));
+ assertThat(vm1.invoke(() -> getTimestampForEntry((long) key, regionName)))
+ .isEqualTo(vm2.invoke(() -> getTimestampForEntry((long) key, regionName)));
+ }
+ }
+
+ public static Object getValueForEntry(long key, String regionName) {
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+ return r.get(key);
+ }
+
+ public static long getTimestampForEntry(long key, String regionName) {
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+ if (r.getEntry(key) == null) {
+ return 0;
+ }
+ return r.getEntry(key).getStatistics().getLastModifiedTime();
+ }
+
public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
AsyncEventListener theListener = null;
@@ -2942,7 +2943,6 @@ public class WANTestBase extends DistributedTestCase {
}
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
- assertNotNull(eventsMap);
await()
.untilAsserted(() -> assertEquals(
"Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size(),
@@ -2993,14 +2993,12 @@ public class WANTestBase extends DistributedTestCase {
}
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
- assertNotNull(eventsMap);
logger.info("The events map size is " + eventsMap.size());
return eventsMap.size();
}
public static void validateRegionSize_PDX(String regionName, final int regionSize) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
await().untilAsserted(() -> {
assertEquals("Expected region entries: " + regionSize + " but actual entries: "
+ r.keySet().size() + " present region keyset " + r.keySet(), true,
@@ -3018,8 +3016,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void validateRegionSizeOnly_PDX(String regionName, final int regionSize) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
await()
.untilAsserted(
() -> assertEquals(
@@ -3053,8 +3050,7 @@ public class WANTestBase extends DistributedTestCase {
*
*/
public static void validateRegionSizeRemainsSame(String regionName, final int regionSizeLimit) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
WaitCriterion wc = new WaitCriterion() {
final int MIN_VERIFICATION_RUNS = 20;
int sameRegionSizeCounter = 0;
@@ -3091,20 +3087,17 @@ public class WANTestBase extends DistributedTestCase {
}
public static String getRegionFullPath(String regionName) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
return r.getFullPath();
}
public static Integer getRegionSize(String regionName) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
return r.keySet().size();
}
public static void validateRegionContents(String regionName, final Map keyValues) {
- final Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
await().untilAsserted(() -> {
boolean matchFlag = true;
for (Object key : keyValues.keySet()) {
@@ -3121,8 +3114,7 @@ public class WANTestBase extends DistributedTestCase {
public static void doHeavyPuts(String regionName, int numPuts) {
- Region r = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(r);
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
// GatewaySender.DEFAULT_BATCH_SIZE * OBJECT_SIZE should be more than MAXIMUM_QUEUE_MEMORY
// to guarantee overflow
for (long i = 0; i < numPuts; i++) {
@@ -3131,8 +3123,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void addCacheListenerAndDestroyRegion(String regionName) {
- final Region region = cache.getRegion(SEPARATOR + regionName);
- assertNotNull(region);
+ final Region<?, ?> region = cache.getRegion(SEPARATOR + regionName);
CacheListenerAdapter cl = new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
@@ -3619,7 +3610,6 @@ public class WANTestBase extends DistributedTestCase {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
PoolImpl pool = sender.getProxy();
if (poolShouldExist) {
- assertNotNull(pool);
assertEquals(expectedPoolLocatorsSize, pool.getLocators().size());
} else {
assertNull(pool);
@@ -3627,8 +3617,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void removeSenderFromTheRegion(String senderId, String regionName) {
- Region region = cache.getRegion(regionName);
- assertNotNull(region);
+ Region<?, ?> region = cache.getRegion(regionName);
region.getAttributesMutator().removeGatewaySenderId(senderId);
}
@@ -3658,7 +3647,6 @@ public class WANTestBase extends DistributedTestCase {
public static void destroyAsyncEventQueue(String id) {
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id);
- assertNotNull(aeq);
aeq.destroy();
}
@@ -4174,7 +4162,6 @@ public class WANTestBase extends DistributedTestCase {
public void run() {
ManagementService service = ManagementService.getManagementService(cache);
GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
- assertNotNull(bean);
bean.start();
assertTrue(bean.isRunning());
}
@@ -4194,7 +4181,6 @@ public class WANTestBase extends DistributedTestCase {
public void run() {
ManagementService service = ManagementService.getManagementService(cache);
GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
- assertNotNull(bean);
bean.stop();
assertFalse(bean.isRunning());
}
@@ -4270,10 +4256,8 @@ public class WANTestBase extends DistributedTestCase {
});
ManagementService service = ManagementService.getManagementService(cache);
final DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
- assertNotNull(dsBean);
Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus();
logger.info("Ds Map is: " + dsMap.size());
- assertNotNull(dsMap);
assertEquals(true, dsMap.size() > 0);
}
};
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
index 33bd871..8259a12 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -38,7 +38,7 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
- vm4.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost",
+ vm4.invoke(() -> WANTestBase.createClientWithLocatorAndRegion(nyPort, "localhost",
getTestMethodName() + "_PR"));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100));
@@ -51,7 +51,7 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
- vm7.invoke(() -> WANTestBase.createClientWithLocator(lnPort, "localhost",
+ vm7.invoke(() -> WANTestBase.createClientWithLocatorAndRegion(lnPort, "localhost",
getTestMethodName() + "_PR"));
startSenderInVMsAsync("ln", vm5, vm6);
diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java
new file mode 100644
index 0000000..80c00a2
--- /dev/null
+++ b/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.GfshTest;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+import org.apache.geode.test.junit.rules.GfshParserRule.CommandCandidate;
+
+@Category(GfshTest.class)
+public class WanCommandAutoCompletionIntegrationTest {
+
+ @Rule
+ public GfshParserRule gfshParserRule = new GfshParserRule();
+
+ @Test
+ public void testCompletionOffersMandatoryOptionsInAlphabeticalOrderForWanCopyRegionWithSpace() {
+ String buffer = "wan-copy region ";
+ CommandCandidate candidate = gfshParserRule.complete(buffer);
+ assertThat(candidate.getCandidates()).hasSize(2);
+ assertThat(candidate.getFirstCandidate()).isEqualTo(buffer + "--region");
+ assertThat(candidate.getCandidate(1)).isEqualTo(buffer + "--sender-id");
+ }
+
+ @Test
+ public void testCompletionOffersTheFirstMandatoryOptionInAlphabeticalOrderForWanCopyRegionWithDash() {
+ String buffer = "wan-copy region --";
+ CommandCandidate candidate = gfshParserRule.complete(buffer);
+ assertThat(candidate.getCandidates()).hasSize(1);
+ assertThat(candidate.getFirstCandidate()).isEqualTo(buffer + "region");
+ }
+
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
index 4e5cf4b..bbeadda 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
@@ -32,8 +32,11 @@ import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.client.locator.GatewaySenderBatchOp;
import org.apache.geode.cache.wan.internal.client.locator.SenderProxy;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
@@ -978,4 +981,21 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
|| t instanceof GemFireSecurityException;
}
}
+
+ @Override
+ public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+ ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException)
+ throws BatchException70 {
+ GatewaySenderBatchOp.executeOn(connection, senderPool, events, batchId,
+ removeFromQueueOnException, false);
+ GatewaySenderEventRemoteDispatcher.GatewayAck ack =
+ (GatewaySenderEventRemoteDispatcher.GatewayAck) GatewaySenderBatchOp.executeOn(connection,
+ senderPool);
+ if (ack == null) {
+ throw new BatchException70("Unknown error sending batch", null, 0, batchId);
+ }
+ if (ack.getBatchException() != null) {
+ throw ack.getBatchException();
+ }
+ }
}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
new file mode 100644
index 0000000..79000b4
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionService implements CacheService {
+
+ private volatile ExecutorService wanCopyRegionFunctionExecutionPool;
+
+ /**
+ * Contains the ongoing executions of WanCopyRegionFunction
+ */
+ private final Map<String, Future<CliFunctionResult>> executions =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public boolean init(Cache cache) {
+ String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+ "WAN Copy Region Function Execution Processor";
+ int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+ wanCopyRegionFunctionExecutionPool = LoggingExecutors
+ .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+ WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
+ return true;
+ }
+
+ @Override
+ public Class<? extends CacheService> getInterface() {
+ return WanCopyRegionFunctionService.class;
+ }
+
+ @Override
+ public CacheServiceMBeanBase getMBean() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ wanCopyRegionFunctionExecutionPool.shutdownNow();
+ try {
+ if (!wanCopyRegionFunctionExecutionPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ wanCopyRegionFunctionExecutionPool.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ wanCopyRegionFunctionExecutionPool.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public CliFunctionResult execute(Callable<CliFunctionResult> callable,
+ String regionName, String senderId) throws InterruptedException, ExecutionException,
+ WanCopyRegionFunctionServiceAlreadyRunningException {
+ String executionName = getExecutionName(regionName, senderId);
+ Future<CliFunctionResult> future = null;
+ try {
+ synchronized (executions) {
+ if (executions.containsKey(executionName)) {
+ throw new WanCopyRegionFunctionServiceAlreadyRunningException(
+ "There is already an execution running for " + regionName + " and " + senderId);
+ }
+ future = wanCopyRegionFunctionExecutionPool.submit(callable);
+ executions.put(executionName, future);
+ }
+ return future.get();
+ } finally {
+ if (future != null) {
+ executions.remove(executionName);
+ }
+ }
+ }
+
+ public boolean cancel(String regionName, String senderId) {
+ Future<CliFunctionResult> execution = executions.remove(getExecutionName(regionName, senderId));
+ if (execution == null) {
+ return false;
+ }
+ execution.cancel(true);
+ return true;
+ }
+
+ public String cancelAll() {
+ String executionsString = executions.keySet().toString();
+ for (Future<CliFunctionResult> execution : executions.values()) {
+ execution.cancel(true);
+ }
+ executions.clear();
+ return executionsString;
+ }
+
+ public int getNumberOfCurrentExecutions() {
+ return executions.size();
+ }
+
+ private String getExecutionName(String regionName, String senderId) {
+ return "(" + regionName + "," + senderId + ")";
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
similarity index 68%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
copy to geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
index 86828ae..3796ab4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
@@ -12,23 +12,13 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package org.apache.geode.internal.cache.wan;
+package org.apache.geode.cache.wan.internal;
-import java.util.List;
+import java.io.Serializable;
-/**
- * @since GemFire 7.0
- *
- */
-public interface GatewaySenderEventDispatcher {
-
- boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry);
-
- boolean isRemoteDispatcher();
-
- boolean isConnectedToRemote();
-
- void stop();
-
- void shutDownAckReaderConnection();
+public class WanCopyRegionFunctionServiceAlreadyRunningException extends Exception
+ implements Serializable {
+ public WanCopyRegionFunctionServiceAlreadyRunningException(String message) {
+ super(message);
+ }
}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java
new file mode 100644
index 0000000..bcc2bd1
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
+import org.apache.geode.management.internal.cli.functions.WanCopyRegionFunction;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.security.ResourcePermission.Operation;
+import org.apache.geode.security.ResourcePermission.Resource;
+
+public class WanCopyRegionCommand extends GfshCommand {
+ private final WanCopyRegionFunction wanCopyRegionFunction = new WanCopyRegionFunction();
+
+ /* 'wan-copy region' command */
+ public static final String WAN_COPY_REGION = "wan-copy region";
+ public static final String WAN_COPY_REGION__HELP =
+ "Copy a region with a senderId via WAN replication";
+ public static final String WAN_COPY_REGION__REGION = "region";
+ public static final String WAN_COPY_REGION__REGION__HELP =
+ "Region from which data will be exported.";
+ public static final String WAN_COPY_REGION__SENDERID = "sender-id";
+ public static final String WAN_COPY_REGION__SENDERID__HELP =
+ "Sender Id to use to copy the region.";
+ public static final String WAN_COPY_REGION__MAXRATE = "max-rate";
+ public static final String WAN_COPY_REGION__MAXRATE__HELP =
+ "Maximum rate for copying in entries per second.";
+ public static final String WAN_COPY_REGION__BATCHSIZE = "batch-size";
+ public static final String WAN_COPY_REGION__BATCHSIZE__HELP =
+ "Number of entries to be copied in each batch.";
+ public static final String WAN_COPY_REGION__CANCEL = "cancel";
+ public static final String WAN_COPY_REGION__CANCEL__HELP =
+ "Cancel an ongoing wan-copy region command";
+ public static final String WAN_COPY_REGION__MSG__REGION__NOT__FOUND = "Region {0} not found";
+ public static final String WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER =
+ "Region {0} is not configured to use sender {1}";
+ public static final String WAN_COPY_REGION__MSG__SENDER__NOT__FOUND = "Sender {0} not found";
+ public static final String WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY =
+ "Sender {0} is serial and not primary. 0 entries copied.";
+ public static final String WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING =
+ "Sender {0} is not running";
+ public static final String WAN_COPY_REGION__MSG__EXECUTION__CANCELED = "Execution canceled";
+ public static final String WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED =
+ "Executions canceled: {0}";
+ public static final String WAN_COPY_REGION__MSG__EXECUTION__FAILED =
+ "Execution failed. Error: {0}";
+ public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL =
+ "No connection pool available to receiver";
+ public static final String WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE =
+ "Command not supported at remote site.";
+ public static final String WAN_COPY_REGION__MSG__NO__CONNECTION =
+ "No connection available to receiver after having copied {0} entries";
+ public static final String WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED =
+ "Error ({0}) in operation after having copied {1} entries";
+ public static final String WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED =
+ "Operation canceled before having copied all entries";
+ public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries copied: {0}";
+ public static final String WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND =
+ "No running command to be canceled for region {0} and sender {1}";
+ public static final String WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND =
+ "There is already a command running for region {0} and sender {1}";
+
+ @CliAvailabilityIndicator({WAN_COPY_REGION})
+ public boolean commandAvailable() {
+ return isOnlineCommandAvailable();
+ }
+
+ @CliCommand(value = WAN_COPY_REGION, help = WAN_COPY_REGION__HELP)
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ public ResultModel wanCopyRegion(
+ @CliOption(key = WAN_COPY_REGION__REGION, mandatory = true,
+ optionContext = ConverterHint.REGION_PATH,
+ help = WAN_COPY_REGION__REGION__HELP) String regionName,
+ @CliOption(key = WAN_COPY_REGION__SENDERID, mandatory = true,
+ optionContext = ConverterHint.GATEWAY_SENDER_ID,
+ help = WAN_COPY_REGION__SENDERID__HELP) String senderId,
+ @CliOption(key = WAN_COPY_REGION__MAXRATE,
+ unspecifiedDefaultValue = "0",
+ help = WAN_COPY_REGION__MAXRATE__HELP) long maxRate,
+ @CliOption(key = WAN_COPY_REGION__BATCHSIZE,
+ unspecifiedDefaultValue = "1000",
+ help = WAN_COPY_REGION__BATCHSIZE__HELP) int batchSize,
+ @CliOption(key = WAN_COPY_REGION__CANCEL,
+ unspecifiedDefaultValue = "false",
+ specifiedDefaultValue = "true",
+ help = WAN_COPY_REGION__CANCEL__HELP) boolean isCancel) {
+
+ authorize(Resource.DATA, Operation.WRITE, regionName);
+ final Object[] args = {regionName, senderId, isCancel, maxRate, batchSize};
+ ResultCollector<?, ?> resultCollector =
+ executeFunction(wanCopyRegionFunction, args, getAllNormalMembers());
+ final List<CliFunctionResult> cliFunctionResults =
+ getCliFunctionResults((List<CliFunctionResult>) resultCollector.getResult());
+ return ResultModel.createMemberStatusResult(cliFunctionResults, false, false);
+ }
+
+ private List<CliFunctionResult> getCliFunctionResults(List<CliFunctionResult> resultsObjects) {
+ final List<CliFunctionResult> cliFunctionResults = new ArrayList<>();
+ for (Object result : resultsObjects) {
+ if (result instanceof FunctionInvocationTargetException) {
+ CliFunctionResult errorResult =
+ new CliFunctionResult(
+ ((FunctionInvocationTargetException) result).getMemberId().getName(),
+ CliFunctionResult.StatusState.ERROR,
+ ((FunctionInvocationTargetException) result).getMessage());
+ cliFunctionResults.add(errorResult);
+ } else {
+ cliFunctionResults.add((CliFunctionResult) result);
+ }
+ }
+ return cliFunctionResults;
+ }
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
index 57e9468..e0cdb23 100755
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
@@ -95,10 +95,12 @@ public class GatewaySenderBatchOp {
byte posDupByte = (byte) (event.getPossibleDuplicate() ? 0x01 : 0x00);
getMessage().addBytesPart(new byte[] {posDupByte});
}
- if (action >= 0 && action <= 3) {
+ if (action >= 0 && action <= GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
// 0 = create
// 1 = update
// 2 = destroy
+ // 3 = update timestamp
+ // 4 = update passing generatecallbacks
String regionName = event.getRegionPath();
EventID eventId = event.getEventId();
Object key = event.getKey();
@@ -110,7 +112,7 @@ public class GatewaySenderBatchOp {
getMessage().addObjPart(eventId);
// Add key
getMessage().addStringOrObjPart(key);
- if (action < 2 /* it is 0 or 1 */) {
+ if (action < 2 || action == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
byte[] value = event.getSerializedValue();
byte valueIsObject = event.getValueIsObject();;
// Add value (which is already a serialized byte[])
diff --git a/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
new file mode 100644
index 0000000..d236ac9
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__FAILED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY;
+
+import java.io.Serializable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionServiceAlreadyRunningException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements Declarable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private final WanCopyRegionFunctionServiceProvider serviceProvider;
+
+ public WanCopyRegionFunction() {
+ this(new WanCopyRegionFunctionServiceProviderImpl());
+ }
+
+ @VisibleForTesting
+ WanCopyRegionFunction(WanCopyRegionFunctionServiceProvider serviceProvider) {
+ this.serviceProvider = serviceProvider;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ if (regionName.startsWith(SEPARATOR)) {
+ regionName = regionName.substring(1);
+ }
+ if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+ return cancelAllWanCopyRegion(context);
+ }
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+ final Cache cache = context.getCache();
+
+ final Region<?, ?> region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__FOUND, regionName));
+ }
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, senderId));
+ }
+ if (!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, regionName,
+ senderId));
+ }
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, senderId));
+ }
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+ CliStrings.format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ return executeFunctionInService(context, region, sender, maxRate, batchSize);
+ }
+
+ private CliFunctionResult executeFunctionInService(FunctionContext<Object[]> context,
+ Region<?, ?> region, GatewaySender sender, long maxRate, int batchSize) {
+ try {
+ return serviceProvider.get((InternalCache) context.getCache()).execute(
+ () -> new WanCopyRegionFunctionDelegate().wanCopyRegion(
+ (InternalCache) context.getCache(), context.getMemberName(), region, sender, maxRate,
+ batchSize),
+ region.getName(),
+ sender.getId());
+ } catch (InterruptedException | CancellationException e) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+ } catch (ExecutionException e) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__EXECUTION__FAILED, e.getMessage()));
+ } catch (WanCopyRegionFunctionServiceAlreadyRunningException e) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND, region.getName(),
+ sender.getId()));
+ }
+ }
+
+ private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> context,
+ String regionName, String senderId) {
+ boolean canceled =
+ serviceProvider.get((InternalCache) context.getCache()).cancel(regionName, senderId);
+ if (!canceled) {
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+ regionName, senderId));
+ }
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+ WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ }
+
+ private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> context) {
+ String executionsString = serviceProvider.get((InternalCache) context.getCache())
+ .cancelAll();
+ return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+ CliStrings.format(WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, executionsString));
+ }
+
+ @FunctionalInterface
+ interface WanCopyRegionFunctionServiceProvider extends Serializable {
+ WanCopyRegionFunctionService get(InternalCache cache);
+ }
+
+ static class WanCopyRegionFunctionServiceProviderImpl
+ implements WanCopyRegionFunctionServiceProvider {
+ @Override
+ public WanCopyRegionFunctionService get(InternalCache cache) {
+ return cache.getService(WanCopyRegionFunctionService.class);
+ }
+ }
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java
new file mode 100644
index 0000000..50c3838
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__CONNECTION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+public class WanCopyRegionFunctionDelegate implements Serializable {
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private int batchId = 0;
+ private final Clock clock;
+ private final ThreadSleeper threadSleeper;
+ private final EventCreator eventCreator;
+
+ private static final Logger logger = LogService.getLogger();
+
+ public WanCopyRegionFunctionDelegate() {
+ this(Clock.systemDefaultZone(), new ThreadSleeperImpl(), new EventCreatorImpl());
+ }
+
+ public WanCopyRegionFunctionDelegate(Clock clock, ThreadSleeper threadSleeper,
+ EventCreator eventCreator) {
+ this.clock = clock;
+ this.threadSleeper = threadSleeper;
+ this.eventCreator = eventCreator;
+ }
+
+ public CliFunctionResult wanCopyRegion(InternalCache cache, String memberName,
+ Region<?, ?> region,
+ GatewaySender sender, long maxRate, int batchSize) throws InterruptedException {
+ ConnectionState connectionState = new ConnectionState();
+ int copiedEntries = 0;
+ Iterator<?> entriesIter = getEntries(region, sender).iterator();
+ final long startTime = clock.millis();
+
+ try {
+ while (entriesIter.hasNext()) {
+ List<GatewayQueueEvent<?, ?>> batch =
+ createBatch((InternalRegion) region, sender, batchSize, cache, entriesIter);
+ if (batch.size() == 0) {
+ continue;
+ }
+ Optional<CliFunctionResult> connectionError =
+ connectionState.connectIfNeeded(memberName, sender);
+ if (connectionError.isPresent()) {
+ return connectionError.get();
+ }
+ Optional<CliFunctionResult> error =
+ sendBatch(memberName, sender, batch, connectionState, copiedEntries);
+ if (error.isPresent()) {
+ return error.get();
+ }
+ copiedEntries += batch.size();
+ doPostSendBatchActions(startTime, copiedEntries, maxRate);
+ }
+ } finally {
+ connectionState.close();
+ }
+
+ if (region.isDestroyed()) {
+ return new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ WanCopyRegionCommand.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Region destroyed",
+ copiedEntries));
+ }
+
+ return new CliFunctionResult(memberName, CliFunctionResult.StatusState.OK,
+ CliStrings.format(WanCopyRegionCommand.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+ copiedEntries));
+ }
+
+ private Optional<CliFunctionResult> sendBatch(String memberName,
+ GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+ ConnectionState connectionState, int copiedEntries) {
+ GatewaySenderEventDispatcher dispatcher =
+ ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+ int retries = 0;
+
+ while (true) {
+ try {
+ dispatcher.sendBatch(batch, connectionState.getConnection(),
+ connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+ return Optional.empty();
+ } catch (BatchException70 e) {
+ return Optional.of(new CliFunctionResult(memberName,
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ e.getExceptions().get(0).getCause(), copiedEntries)));
+ } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+ Optional<CliFunctionResult> error =
+ connectionState.reconnect(memberName, retries++, copiedEntries, e);
+ if (error.isPresent()) {
+ return error;
+ }
+ }
+ }
+ }
+
+ private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, GatewaySender sender,
+ int batchSize, InternalCache cache, Iterator<?> iter) {
+ int batchIndex = 0;
+ List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+ while (iter.hasNext() && batchIndex < batchSize) {
+ GatewayQueueEvent<?, ?> event =
+ eventCreator.createGatewaySenderEvent(cache, region, sender,
+ (Region.Entry<?, ?>) iter.next());
+ if (event != null) {
+ batch.add(event);
+ batchIndex++;
+ }
+ }
+ return batch;
+ }
+
+ private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+ if (region instanceof PartitionedRegion && sender.isParallel()) {
+ return ((PartitionedRegion) region).getDataStore().getAllLocalBucketRegions()
+ .stream()
+ .flatMap(br -> ((Set<?>) br.entrySet()).stream()).collect(Collectors.toSet());
+ }
+ return region.entrySet();
+ }
+
+ /**
+ * It runs the actions to be done after a batch has been
+ * sent: throw an interrupted exception if the operation was canceled and
+ * adjust the rate of copying by sleeping if necessary.
+ *
+ * @param startTime time at which the entries started to be copied
+ * @param copiedEntries number of entries copied so far
+ * @param maxRate maximum copying rate
+ */
+ @VisibleForTesting
+ void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+ throws InterruptedException {
+ long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+ if (sleepMs > 0) {
+ logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+ this.getClass().getSimpleName(), sleepMs);
+ threadSleeper.sleep(sleepMs);
+ } else {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ }
+ }
+
+ private int getAndIncrementBatchId() {
+ if (batchId + 1 == Integer.MAX_VALUE) {
+ batchId = 0;
+ }
+ return batchId++;
+ }
+
+ @VisibleForTesting
+ long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+ if (maxRate == 0) {
+ return 0;
+ }
+ final long elapsedMs = clock.millis() - startTime;
+ if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= maxRate) {
+ return 0;
+ }
+ final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+ return targetElapsedMs - elapsedMs;
+ }
+
+
+ static class ConnectionState {
+ private volatile Connection connection = null;
+ private volatile PoolImpl senderPool = null;
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public PoolImpl getSenderPool() {
+ return senderPool;
+ }
+
+ public Optional<CliFunctionResult> connectIfNeeded(String memberName,
+ GatewaySender sender) {
+ if (senderPool == null) {
+ senderPool = ((AbstractGatewaySender) sender).getProxy();
+ if (senderPool == null) {
+ return Optional.of(new CliFunctionResult(memberName,
+ CliFunctionResult.StatusState.ERROR,
+ WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+ }
+ connection = senderPool.acquireConnection();
+ if (connection.getWanSiteVersion() < KnownVersion.GEODE_1_15_0.ordinal()) {
+ return Optional.of(new CliFunctionResult(memberName,
+ CliFunctionResult.StatusState.ERROR,
+ WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+ }
+ }
+ return Optional.empty();
+ }
+
+ public Optional<CliFunctionResult> reconnect(String memberName, int retries,
+ int copiedEntries, Exception e) {
+ close();
+ if (retries >= MAX_BATCH_SEND_RETRIES) {
+ return Optional.of(new CliFunctionResult(memberName,
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Connection error", copiedEntries)));
+ }
+ logger.error("Exception {} in sendBatch. Retrying", e.getClass().getName());
+ try {
+ connection = senderPool.acquireConnection();
+ } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+ return Optional.of(new CliFunctionResult(memberName,
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ WAN_COPY_REGION__MSG__NO__CONNECTION,
+ copiedEntries)));
+ }
+ return Optional.empty();
+ }
+
+ public void close() {
+ if (senderPool != null && connection != null) {
+ try {
+ connection.close(false);
+ } catch (Exception e) {
+ logger.error("Error closing the connection used to wan-copy region entries");
+ }
+ senderPool.returnConnection(connection);
+ }
+ connection = null;
+ }
+ }
+
+
+ @FunctionalInterface
+ interface ThreadSleeper extends Serializable {
+ void sleep(long millis) throws InterruptedException;
+ }
+
+ static class ThreadSleeperImpl implements ThreadSleeper {
+ @Override
+ public void sleep(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+
+ @FunctionalInterface
+ interface EventCreator extends Serializable {
+ GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+ InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
+ }
+
+ static class EventCreatorImpl implements EventCreator {
+ @VisibleForTesting
+ public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+ InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry) {
+ final EntryEventImpl event;
+ if (region instanceof PartitionedRegion) {
+ event = createEventForPartitionedRegion(sender, cache, region, entry);
+ } else {
+ event = createEventForReplicatedRegion(cache, region, entry);
+ }
+ if (event == null) {
+ return null;
+ }
+ try {
+ return new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS,
+ event, null, TransactionMetadataDisposition.EXCLUDE);
+ } catch (IOException e) {
+ logger.error("Error when creating event in wan-copy: {}", e.getMessage());
+ return null;
+ }
+ }
+
+ private EntryEventImpl createEventForReplicatedRegion(InternalCache cache,
+ InternalRegion region,
+ Region.Entry<?, ?> entry) {
+ return createEvent(cache, region, entry);
+ }
+
+ private EntryEventImpl createEventForPartitionedRegion(GatewaySender sender,
+ InternalCache cache,
+ InternalRegion region,
+ Region.Entry<?, ?> entry) {
+ EntryEventImpl event = createEvent(cache, region, entry);
+ if (event == null) {
+ return null;
+ }
+ BucketRegion bucketRegion = ((PartitionedRegion) event.getRegion()).getDataStore()
+ .getLocalBucketById(event.getKeyInfo().getBucketId());
+ if (bucketRegion != null && !bucketRegion.getBucketAdvisor().isPrimary()
+ && sender.isParallel()) {
+ return null;
+ }
+ if (bucketRegion != null) {
+ bucketRegion.handleWANEvent(event);
+ }
+ return event;
+ }
+
+ private EntryEventImpl createEvent(InternalCache cache, InternalRegion region,
+ Region.Entry<?, ?> entry) {
+ EntryEventImpl event;
+ try {
+ event = new DefaultEntryEventFactory().create(region, Operation.UPDATE,
+ entry.getKey(),
+ entry.getValue(), null, false,
+ cache.getInternalDistributedSystem().getDistributedMember(), false);
+ } catch (EntryDestroyedException e) {
+ return null;
+ }
+ if (entry instanceof NonTXEntry) {
+ event.setVersionTag(((NonTXEntry) entry).getRegionEntry().getVersionStamp().asVersionTag());
+ } else {
+ event.setVersionTag(((EntrySnapshot) entry).getVersionTag());
+ }
+ event.setNewEventId(cache.getInternalDistributedSystem());
+ return event;
+ }
+ }
+}
diff --git a/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService b/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
new file mode 100644
index 0000000..30999e3
--- /dev/null
+++ b/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
@@ -0,0 +1 @@
+org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService
diff --git a/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker b/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker
new file mode 100644
index 0000000..182564b
--- /dev/null
+++ b/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker
@@ -0,0 +1 @@
+org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand
diff --git a/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt b/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
index e69de29..a93f5db 100755
--- a/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
+++ b/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
@@ -0,0 +1,6 @@
+org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction,true,1,serviceProvider:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction$WanCopyRegionFunctionServiceProvider
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction$WanCopyRegionFunctionServiceProviderImpl,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate,false,batchId:int,clock:java/time/Clock,eventCreator:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreator,threadSleeper:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeper
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreatorImpl,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeperImpl,false
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
new file mode 100644
index 0000000..864aa63
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public class WanCopyRegionFunctionServiceTest {
+
+ private WanCopyRegionFunctionService service;
+ private final InternalCache cache = mock(InternalCache.class);
+
+ @Before
+ public void setUp() throws Exception {
+ service = new WanCopyRegionFunctionService();
+ service.init(cache);
+ }
+
+ @Test
+ public void severalExecuteWithSameRegionAndSenderNotAllowed() {
+ CountDownLatch latch = new CountDownLatch(1);
+ Callable<CliFunctionResult> firstExecution = () -> {
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ return null;
+ };
+
+ String regionName = "myRegion";
+ String senderId = "mySender";
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(firstExecution, regionName, senderId);
+ } catch (Exception e) {
+ return null;
+ }
+ });
+
+ // Wait for the execute function to start
+ await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(1));
+
+ // Execute another function instance for the same region and sender-id
+ Callable<CliFunctionResult> secondExecution = () -> null;
+
+ assertThatThrownBy(() -> service.execute(secondExecution, regionName, senderId))
+ .isInstanceOf(WanCopyRegionFunctionServiceAlreadyRunningException.class);
+
+ // Let first execution finish
+ latch.countDown();
+ }
+
+ @Test
+ public void cancelRunningExecutionReturnsSuccess() {
+ String regionName = "myRegion";
+ String senderId = "mySender";
+ CountDownLatch latch = new CountDownLatch(1);
+ Callable<CliFunctionResult> firstExecution = () -> {
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ return null;
+ };
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(firstExecution, regionName, senderId);
+ } catch (Exception e) {
+ return null;
+ }
+ });
+
+ // Wait for the function to start execution
+ await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(1));
+
+ // Cancel the function execution
+ boolean result = service.cancel(regionName, senderId);
+
+ assertThat(result).isEqualTo(true);
+ await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+ }
+
+ @Test
+ public void cancelNotRunningExecutionReturnsError() {
+ final String regionName = "myRegion";
+ final String senderId = "mySender";
+
+ boolean result = service.cancel(regionName, senderId);
+
+ assertThat(result).isEqualTo(false);
+ }
+
+ @Test
+ public void cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() {
+ CountDownLatch latch = new CountDownLatch(2);
+ Callable<CliFunctionResult> firstExecution = () -> {
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ return null;
+ };
+
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(firstExecution, "myRegion", "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
+
+ Callable<CliFunctionResult> secondExecution = () -> {
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ return null;
+ };
+
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(secondExecution, "myRegion", "mySender");
+ } catch (Exception e) {
+ return null;
+ }
+ });
+
+ // Wait for the functions to start execution
+ await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2));
+
+ // Cancel the function execution
+ String executionsString = service.cancelAll();
+
+ assertThat(executionsString).isEqualTo("[(myRegion,mySender1), (myRegion,mySender)]");
+ await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+ }
+
+ @Test
+ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
+ int executions = 5;
+ CountDownLatch latch = new CountDownLatch(executions);
+ for (int i = 0; i < executions; i++) {
+ Callable<CliFunctionResult> firstExecution = () -> {
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ return null;
+ };
+
+ final String regionName = String.valueOf(i);
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(firstExecution, regionName, "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
+ }
+
+ // Wait for the functions to start execution
+ await().untilAsserted(
+ () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
+
+ // End executions
+ for (int i = 0; i < executions; i++) {
+ latch.countDown();
+ }
+ }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java
new file mode 100644
index 0000000..9028a47
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__BATCHSIZE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__CANCEL;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MAXRATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.management.internal.cli.GfshParseResult;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+public class WanCopyRegionCommandTest {
+ @ClassRule
+ public static GfshParserRule gfsh = new GfshParserRule();
+
+ @Test
+ public void gfshParserReturnsNullIfMandatoryOptionsNotSpecified() {
+ assertThat(gfsh.parse("wan-copy region")).isNull();
+ assertThat(gfsh.parse("wan-copy region --region=myregion")).isNull();
+ assertThat(gfsh.parse("wan-copy region --sender-id=ln")).isNull();
+ assertThat(gfsh.parse("wan-copy region --batch-size=10")).isNull();
+ }
+
+ @Test
+ public void verifyDefaultValues() {
+ GfshParseResult result = gfsh.parse("wan-copy region --region=myregion --sender-id=ln");
+ assertThat(result.getParamValueAsString(WAN_COPY_REGION__MAXRATE)).isEqualTo("0");
+ assertThat(result.getParamValueAsString(WAN_COPY_REGION__BATCHSIZE))
+ .isEqualTo("1000");
+ assertThat(result.getParamValueAsString(WAN_COPY_REGION__CANCEL))
+ .isEqualTo("false");
+ }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java
new file mode 100644
index 0000000..34cebd4
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.management.internal.cli.functions.WanCopyRegionFunctionDelegate.EventCreator;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionDelegateTest {
+
+ private WanCopyRegionFunctionDelegate function;
+ private long startTime;
+ private final int entries = 25;
+ private Clock clockMock;
+ private WanCopyRegionFunctionDelegate.ThreadSleeper threadSleeperMock;
+ private InternalCache internalCacheMock;
+ private GatewaySender gatewaySenderMock;
+ private PoolImpl poolMock;
+ private Connection connectionMock;
+ private GatewaySenderEventDispatcher dispatcherMock;
+
+ private final Region<?, ?> regionMock =
+ uncheckedCast(mock(InternalRegion.class));
+
+ private EventCreator eventCreatorMock;
+
+ @Before
+ public void setUp() throws InterruptedException {
+ startTime = System.currentTimeMillis();
+ clockMock = mock(Clock.class);
+ threadSleeperMock = mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class);
+ gatewaySenderMock = mock(AbstractGatewaySender.class);
+ poolMock = mock(PoolImpl.class);
+ connectionMock = mock(PooledConnection.class);
+ dispatcherMock = mock(GatewaySenderEventDispatcher.class);
+ eventCreatorMock = mock(EventCreator.class);
+ AbstractGatewaySenderEventProcessor eventProcessorMock =
+ mock(AbstractGatewaySenderEventProcessor.class);
+ RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+ internalCacheMock = mock(InternalCache.class);
+
+ function = new WanCopyRegionFunctionDelegate(clockMock, threadSleeperMock, eventCreatorMock);
+
+ doNothing().when(threadSleeperMock).sleep(anyLong());
+ when(gatewaySenderMock.getId()).thenReturn("mySender");
+ when(connectionMock.getWanSiteVersion()).thenReturn(KnownVersion.GEODE_1_15_0.ordinal());
+ when(eventCreatorMock.createGatewaySenderEvent(any(), any(), any(), any()))
+ .thenReturn(uncheckedCast(mock(GatewayQueueEvent.class)));
+ when(eventProcessorMock.getDispatcher()).thenReturn(dispatcherMock);
+ when(((InternalGatewaySender) gatewaySenderMock).getEventProcessor())
+ .thenReturn(eventProcessorMock);
+ when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(Collections.emptySet()));
+ when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+ when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+ }
+
+ @Test
+ public void doPostSendBatchActions_DoNotSleepIfMaxRateIsZero()
+ throws InterruptedException {
+ // arrange
+ when(internalCacheMock.getRegion(any())).thenReturn(null);
+
+ // act
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 0L, 10);
+
+ // assert
+ verify(threadSleeperMock, never()).sleep(anyLong());
+ }
+
+ @Test
+ public void doPostSendBatchActions_SleepIfMaxRateIsNotZero()
+ throws InterruptedException, BatchException70 {
+ // arrange
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 2};
+ WanCopyRegionFunctionDelegate.ThreadSleeper sleeperMock =
+ mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class);
+ doNothing().when(sleeperMock).sleep(anyLong());
+
+ // act
+ executeWanCopyRegionFunction(options, sleeperMock);
+
+ // assert
+ ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
+ verify(sleeperMock, times(1)).sleep(captor.capture());
+ assertThat(captor.getValue()).isGreaterThan(0).isLessThanOrEqualTo(2000);
+ }
+
+ @Test
+ public void doPostSendBatchActions_ThrowInterruptedIfInterruptedTimeToSleepNotZero()
+ throws InterruptedException {
+ // arrange
+ long maxRate = 1;
+ long elapsedTime = 20L;
+ InterruptedException thrownByThreadSleeper = new InterruptedException("thrownByThreadSleeper");
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ doThrow(thrownByThreadSleeper).when(threadSleeperMock).sleep(anyLong());
+
+ // act
+ Throwable thrown =
+ catchThrowable(() -> function.doPostSendBatchActions(startTime, entries, maxRate));
+
+ // assert
+ assertThat(thrown).isInstanceOf(InterruptedException.class);
+ }
+
+ @Test
+ public void doPostSendBatchActions_ThrowInterruptedIfInterruptedTimeToSleepIsZero() {
+ long maxRate = 0;
+
+ // act
+ Thread.currentThread().interrupt();
+ Throwable thrown =
+ catchThrowable(() -> function.doPostSendBatchActions(startTime, entries, maxRate));
+
+ // assert
+ assertThat(thrown).isInstanceOf(InterruptedException.class);
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenRemoteSiteDoesNotSupportCommand()
+ throws InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ PooledConnection oldWanSiteConn = mock(PooledConnection.class);
+ when(oldWanSiteConn.getWanSiteVersion()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+ when(poolMock.acquireConnection()).thenReturn(oldWanSiteConn);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Command not supported at remote site.");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenNoPoolAvailableAndEntriesInRegion()
+ throws InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(null);
+ when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+ .thenThrow(NoAvailableServersException.class);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("No connection pool available to receiver");
+ }
+
+ @Test
+ public void wanCopyRegion_verifySuccessWhenNoPoolAvailableAndNoEntriesInRegion()
+ throws InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(null);
+ when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+ .thenThrow(NoAvailableServersException.class);
+ when(regionMock.entrySet()).thenReturn(new HashSet<>());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Entries copied: 0");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenNoConnectionAvailableAtStartAndEntriesInRegion() {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+ .thenThrow(NoAvailableServersException.class);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+ // act
+ Throwable thrown = catchThrowable(() -> function.wanCopyRegion(internalCacheMock, "member1",
+ regionMock, gatewaySenderMock, 1, 10));
+
+ // assert
+ assertThat(thrown).isInstanceOf(NoAvailableServersException.class);
+ }
+
+ @Test
+ public void wanCopyRegion_verifySuccessWhenNoConnectionAvailableAtStartAndNoEntriesInRegion()
+ throws InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+ .thenThrow(NoAvailableServersException.class);
+ when(regionMock.entrySet()).thenReturn(new HashSet<>());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Entries copied: 0");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenNoConnectionAvailableAfterCopyingSomeEntries()
+ throws BatchException70, InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock)
+ .thenThrow(NoAvailableServersException.class);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doNothing().doThrow(ConnectionDestroyedException.class).doNothing().when(dispatcherMock)
+ .sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 1);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("No connection available to receiver after having copied 1 entries");
+ }
+
+ @Test
+ public void wanCopyRegion_verifySuccess() throws BatchException70, InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Entries copied: 1");
+ }
+
+ @Test
+ public void wanCopyRegion_verifySuccessWithRetryWhenConnectionDestroyed()
+ throws BatchException70, InterruptedException {
+ // arrange
+ ConnectionDestroyedException exceptionWhenSendingBatch =
+ new ConnectionDestroyedException("My connection exception", new Exception());
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(exceptionWhenSendingBatch).doNothing().when(dispatcherMock).sendBatch(anyList(), any(),
+ any(), anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Entries copied: 1");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenConnectionDestroyedTwice()
+ throws BatchException70, InterruptedException {
+ // arrange
+ ConnectionDestroyedException exceptionWhenSendingBatch =
+ new ConnectionDestroyedException("My connection exception", new Exception());
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+ anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Error (Connection error) in operation after having copied 0 entries");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenRegionDestroyed()
+ throws BatchException70, InterruptedException {
+ // arrange
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+ doReturn(true).when(regionMock).isDestroyed();
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Error (Region destroyed) in operation after having copied 1 entries");
+ }
+
+ @Test
+ public void wanCopyRegion_verifySuccessWithRetryWhenServerConnectivityException()
+ throws BatchException70, InterruptedException {
+ // arrange
+ ServerConnectivityException exceptionWhenSendingBatch =
+ new ServerConnectivityException("My connection exception", new Exception());
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(exceptionWhenSendingBatch).doNothing().when(dispatcherMock).sendBatch(anyList(), any(),
+ any(), anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Entries copied: 1");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenServerConnectivityExceptionTwice()
+ throws BatchException70, InterruptedException {
+ // arrange
+ ServerConnectivityException exceptionWhenSendingBatch =
+ new ServerConnectivityException("My connection exception", new Exception());
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+ anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Error (Connection error) in operation after having copied 0 entries");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyErrorWhenBatchExceptionWhileSendingBatch()
+ throws BatchException70, InterruptedException {
+ // arrange
+ BatchException70 exceptionWhenSendingBatch =
+ new BatchException70("My batch exception", new Exception("test exception"), 0, 0);
+ BatchException70 topLevelException =
+ new BatchException70(Collections.singletonList(exceptionWhenSendingBatch));
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(topLevelException).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+ anyInt(), anyBoolean());
+
+ // act
+ CliFunctionResult result =
+ function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+ // assert
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo(
+ "Error (java.lang.Exception: test exception) in operation after having copied 0 entries");
+ }
+
+ @Test
+ public void wanCopyRegion_verifyExceptionThrownWhenExceptionWhileSendingBatch()
+ throws BatchException70 {
+ // arrange
+ RuntimeException exceptionWhenSendingBatch =
+ new RuntimeException("Exception when sending batch");
+ when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock);
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+ doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+ anyInt(), anyBoolean());
+
+ // act
+ Throwable thrown = catchThrowable(
+ () -> function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1,
+ 10));
+
+ // assert
+ assertThat(thrown).isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenMaxRateIsZero() {
+ assertThat(function.getTimeToSleep(startTime, 1, 0)).isEqualTo(0);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenCopiedEntriesIsZero() {
+ assertThat(function.getTimeToSleep(startTime, 0, 1)).isEqualTo(0);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenBelowMaxRate() {
+ long elapsedTime = 2000L;
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ assertThat(function.getTimeToSleep(startTime, 1, 1)).isEqualTo(0);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenOnMaxRate() {
+ long elapsedTime = 1000L;
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ assertThat(function.getTimeToSleep(startTime, 1, 1)).isEqualTo(0);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value1() {
+ long elapsedTime = 1000L;
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ assertThat(function.getTimeToSleep(startTime, 2, 1)).isEqualTo(1000);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value2() {
+ long elapsedTime = 1000L;
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ assertThat(function.getTimeToSleep(startTime, 4, 1)).isEqualTo(3000);
+ }
+
+ @Test
+ public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value3() {
+ long elapsedTime = 2000L;
+ when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+ assertThat(function.getTimeToSleep(startTime, 4, 1)).isEqualTo(2000);
+ }
+
+ private CliFunctionResult executeWanCopyRegionFunction(Object[] options,
+ WanCopyRegionFunctionDelegate.ThreadSleeper sleeper)
+ throws BatchException70, InterruptedException {
+ Region<?, ?> regionMock =
+ uncheckedCast(mock(InternalRegion.class));
+
+ RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+ Set<?> idsMock = mock(Set.class);
+ when(idsMock.contains(anyString())).thenReturn(true);
+ when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(idsMock));
+ when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+
+ Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ entries.add(uncheckedCast(mock(Region.Entry.class)));
+ when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+ doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+ AbstractGatewaySender gatewaySenderMock = mock(AbstractGatewaySender.class);
+ when(gatewaySenderMock.getId()).thenReturn((String) options[1]);
+ when(gatewaySenderMock.isRunning()).thenReturn(true);
+ when(gatewaySenderMock.isParallel()).thenReturn(true);
+ when(gatewaySenderMock.isPrimary()).thenReturn(false);
+
+ when(gatewaySenderMock.getProxy()).thenReturn(poolMock);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+
+ AbstractGatewaySenderEventProcessor eventProcessorMock =
+ mock(AbstractGatewaySenderEventProcessor.class);
+ when(eventProcessorMock.getDispatcher()).thenReturn(dispatcherMock);
+ when(gatewaySenderMock.getEventProcessor()).thenReturn(eventProcessorMock);
+
+ InternalCache internalCacheMock = mock(InternalCache.class);
+ when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+ when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+
+ WanCopyRegionFunctionDelegate function =
+ new WanCopyRegionFunctionDelegate(Clock.systemDefaultZone(),
+ sleeper, eventCreatorMock);
+
+ return function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 10,
+ 10);
+ }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java
new file mode 100644
index 0000000..1e278d2
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionTest {
+
+ private WanCopyRegionFunction function;
+ private InternalCache internalCacheMock;
+ private GatewaySender gatewaySenderMock;
+
+ private final FunctionContext<Object[]> contextMock = uncheckedCast(mock(FunctionContext.class));
+
+ private final Region<?, ?> regionMock =
+ uncheckedCast(mock(InternalRegion.class));
+
+
+ @Before
+ public void setUp() throws InterruptedException {
+ gatewaySenderMock = mock(AbstractGatewaySender.class);
+ when(gatewaySenderMock.getId()).thenReturn("mySender");
+
+ function = new WanCopyRegionFunction();
+
+ RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+ Set<?> idsMock = mock(Set.class);
+ when(idsMock.contains(anyString())).thenReturn(true);
+ when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(idsMock));
+ when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+
+ internalCacheMock = mock(InternalCache.class);
+ when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+ }
+
+ @Test
+ public void executeFunction_verifyErrorWhenRegionNotFound() {
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+ when(internalCacheMock.getRegion(any())).thenReturn(null);
+ when(contextMock.getArguments()).thenReturn(options);
+ when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+ CliFunctionResult result = function.executeFunction(contextMock);
+
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage()).isEqualTo("Region myRegion not found");
+ }
+
+ @Test
+ public void executeFunction_verifyErrorWhenSenderNotFound() {
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+ when(internalCacheMock.getGatewaySender(any())).thenReturn(null);
+ when(contextMock.getArguments()).thenReturn(options);
+ when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+ CliFunctionResult result = function.executeFunction(contextMock);
+
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage()).isEqualTo("Sender mySender not found");
+ }
+
+ @Test
+ public void executeFunction_verifyErrorWhenSenderIsNotRunning() {
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+ when(gatewaySenderMock.isRunning()).thenReturn(false);
+ when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+ when(contextMock.getArguments()).thenReturn(options);
+ when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+ CliFunctionResult result = function.executeFunction(contextMock);
+
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage()).isEqualTo("Sender mySender is not running");
+ }
+
+ @Test
+ public void executeFunction_verifySuccessWhenSenderIsSerialAndSenderIsNotPrimary() {
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+ when(gatewaySenderMock.isRunning()).thenReturn(true);
+ when(gatewaySenderMock.isParallel()).thenReturn(false);
+ when(((InternalGatewaySender) gatewaySenderMock).isPrimary()).thenReturn(false);
+ when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+ when(contextMock.getArguments()).thenReturn(options);
+ when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+ CliFunctionResult result = function.executeFunction(contextMock);
+
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Sender mySender is serial and not primary. 0 entries copied.");
+ }
+
+ @Test
+ public void executeFunction_verifyErrorWhenSenderNotConfiguredWithForRegion() {
+ Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+ Set<String> senders = new HashSet<>();
+ senders.add("notMySender");
+ when(gatewaySenderMock.isParallel()).thenReturn(true);
+ when(regionMock.getAttributes().getGatewaySenderIds()).thenReturn(senders);
+ when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+ when(contextMock.getArguments()).thenReturn(options);
+ when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+ CliFunctionResult result = function.executeFunction(contextMock);
+ assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+ assertThat(result.getStatusMessage())
+ .isEqualTo("Region myRegion is not configured to use sender mySender");
+ }
+}
diff --git a/geode-wan/src/test/resources/expected-pom.xml b/geode-wan/src/test/resources/expected-pom.xml
index 50f45c8..52547b2 100644
--- a/geode-wan/src/test/resources/expected-pom.xml
+++ b/geode-wan/src/test/resources/expected-pom.xml
@@ -72,6 +72,11 @@
<scope>runtime</scope>
</dependency>
<dependency>
+ <groupId>org.apache.geode</groupId>
+ <artifactId>geode-gfsh</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>runtime</scope>