You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2021/11/12 06:57:38 UTC

[geode] branch develop updated: GEODE-9369: Command to copy region entries from a WAN site to another (#7006)

This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3eaeed8  GEODE-9369: Command to copy region entries from a WAN site to another (#7006)
3eaeed8 is described below

commit 3eaeed886f3e6ec9ec6f341954fc837eed7b03f8
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Fri Nov 12 07:56:10 2021 +0100

    GEODE-9369: Command to copy region entries from a WAN site to another (#7006)
    
    * GEODE-9369: Add command to copy region entries from one site to another
    
    * GEODE-9369: Create servers without offheap to avoid test failures in windows due to out of memory
    
    * GEODE-9369: Fix flaky test
    
    * GEODE-9369: Fix some test case failues on windows
    
    * GEODE-9369: Fix windows test failures in CI by not running the tests on windows
    
    * GEODE-9369: Fix test cases after rebasing locator serialization filtering by default on Java 8
    
    * GEODE-9369: Revert changes required by GEODE-9758
---
 .../source/subnavs/geode-subnav.erb                |    1 +
 .../cache30/DistributedAckRegionCCEDUnitTest.java  |    2 +-
 .../geode/internal/cache/EnumListenerEvent.java    |   24 +-
 .../apache/geode/internal/cache/LocalRegion.java   |    7 +-
 .../cache/tier/sockets/CacheServerStats.java       |    4 +-
 .../sockets/command/GatewayReceiverCommand.java    |   11 +-
 .../internal/cache/tier/sockets/command/Put70.java |    2 +-
 .../wan/GatewaySenderEventCallbackDispatcher.java  |    9 +
 .../cache/wan/GatewaySenderEventDispatcher.java    |    9 +
 .../internal/cache/wan/GatewaySenderEventImpl.java |   13 +-
 .../internal/cache/EnumListenerEventJUnitTest.java |    3 +-
 .../cache/tier/sockets/command/Put70Test.java      |    2 +-
 .../cache/wan/GatewaySenderEventImplTest.java      |   36 +-
 .../gfsh/command-pages/wan_copy_region.html.md.erb |  192 +++
 .../gfsh/gfsh_command_index.html.md.erb            |    5 +-
 .../gfsh/quick_ref_commands_by_area.html.md.erb    |    1 +
 .../GfshParserAutoCompletionIntegrationTest.java   |    6 +-
 geode-wan/build.gradle                             |    1 +
 .../commands/WanCopyRegionCommandDUnitTest.java    | 1399 ++++++++++++++++++++
 .../geode/internal/cache/wan/WANTestBase.java      |  278 ++--
 ...arallelWANPropagationClientServerDUnitTest.java |    4 +-
 .../WanCommandAutoCompletionIntegrationTest.java   |   50 +
 .../GatewaySenderEventRemoteDispatcher.java        |   20 +
 .../wan/internal/WanCopyRegionFunctionService.java |  123 ++
 ...gionFunctionServiceAlreadyRunningException.java |   24 +-
 .../cli/commands/WanCopyRegionCommand.java         |  138 ++
 .../client/locator/GatewaySenderBatchOp.java       |    6 +-
 .../cli/functions/WanCopyRegionFunction.java       |  209 +++
 .../functions/WanCopyRegionFunctionDelegate.java   |  386 ++++++
 .../org.apache.geode.internal.cache.CacheService   |    1 +
 .../org.springframework.shell.core.CommandMarker   |    1 +
 .../sanctioned-geode-wan-serializables.txt         |    6 +
 .../internal/WanCopyRegionFunctionServiceTest.java |  186 +++
 .../cli/commands/WanCopyRegionCommandTest.java     |   49 +
 .../WanCopyRegionFunctionDelegateTest.java         |  575 ++++++++
 .../cli/functions/WanCopyRegionFunctionTest.java   |  142 ++
 geode-wan/src/test/resources/expected-pom.xml      |    5 +
 37 files changed, 3739 insertions(+), 191 deletions(-)

diff --git a/geode-book/master_middleman/source/subnavs/geode-subnav.erb b/geode-book/master_middleman/source/subnavs/geode-subnav.erb
index 2b27ac8..14b54c8 100644
--- a/geode-book/master_middleman/source/subnavs/geode-subnav.erb
+++ b/geode-book/master_middleman/source/subnavs/geode-subnav.erb
@@ -2084,6 +2084,7 @@ limitations under the License.
                                     </li>
                                     <li>
                                         <a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/version.html">version</a>
+                                        <a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/wan_copy_region.html">wan-copy region</a>
                                     </li>
                                 </ul>
                             </li>
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index a0be478..8e276bf 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -420,7 +420,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
       VersionTagHolder holder = new VersionTagHolder(tag);
       ClientProxyMembershipID id = ClientProxyMembershipID
           .getNewProxyMembership(CCRegion.getDistributionManager().getSystem());
-      CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder);
+      CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, holder, true);
       vm0.invoke("check conflation count", () -> {
         // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false
         // ARM.updateEntry will be called one more time, so there will be 2 conflicted events
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
index 8f95d97..c2bd339 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EnumListenerEvent.java
@@ -106,12 +106,17 @@ public abstract class EnumListenerEvent {
   public static final EnumListenerEvent TIMESTAMP_UPDATE = new TIMESTAMP_UPDATE(); // 18
 
   @Immutable
+  public static final EnumListenerEvent AFTER_UPDATE_WITH_GENERATE_CALLBACKS =
+      new AFTER_UPDATE_WITH_GENERATE_CALLBACKS(); // 19
+
+  @Immutable
   private static final EnumListenerEvent[] instances =
       new EnumListenerEvent[] {AFTER_CREATE, AFTER_UPDATE, AFTER_INVALIDATE, AFTER_DESTROY,
           AFTER_REGION_CREATE, AFTER_REGION_INVALIDATE, AFTER_REGION_CLEAR, AFTER_REGION_DESTROY,
           AFTER_REMOTE_REGION_CREATE, AFTER_REMOTE_REGION_DEPARTURE, AFTER_REMOTE_REGION_CRASH,
           AFTER_ROLE_GAIN, AFTER_ROLE_LOSS, AFTER_REGION_LIVE, AFTER_REGISTER_INSTANTIATOR,
-          AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE};
+          AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE,
+          AFTER_UPDATE_WITH_GENERATE_CALLBACKS};
 
   static {
     for (int i = 0; i < instances.length; i++) {
@@ -412,6 +417,21 @@ public abstract class EnumListenerEvent {
     }
   }
 
+  private static class AFTER_UPDATE_WITH_GENERATE_CALLBACKS extends EnumListenerEvent {
+    protected AFTER_UPDATE_WITH_GENERATE_CALLBACKS() {
+      super("AFTER_UPDATE_WITH_GENERATE_CALLBACKS");
+    }
+
+    @Override
+    public void dispatchEvent(CacheEvent event, CacheListener listener) {}
+
+    @Override
+    public byte getEventCode() {
+      return 19;
+    }
+  }
+
+
   /**
    *
    * This method returns the EnumListenerEvent object corresponding to the cCode given.
@@ -435,6 +455,8 @@ public abstract class EnumListenerEvent {
    * <li>15 - AFTER_REGISTER_INSTANTIATOR
    * <li>16 - AFTER_REGISTER_DATASERIALIZER
    * <li>17 - AFTER_TOMBSTONE_EXPIRATION
+   * <li>18 - TIMESTAMP_UPDATE
+   * <li>19 - AFTER_UPDATE_WITH_GENERATE_CALLBACKS
    * </ul>
    *
    * @param eventCode the eventCode corresponding to the EnumListenerEvent object desired
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index af28668..88becfd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5179,8 +5179,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boolean isObject,
-      Object callbackArg, ClientProxyMembershipID memberId, boolean fromClient,
-      EntryEventImpl clientEvent) throws TimeoutException, CacheWriterException {
+      Object callbackArg, ClientProxyMembershipID memberId,
+      EntryEventImpl clientEvent, boolean generateCallbacks)
+      throws TimeoutException, CacheWriterException {
 
     EventID eventID = clientEvent.getEventId();
     Object theCallbackArg = callbackArg;
@@ -5189,7 +5190,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     @Released
     final EntryEventImpl event = entryEventFactory.create(this, Operation.UPDATE, key,
         null, theCallbackArg, false,
-        memberId.getDistributedMember(), true, eventID);
+        memberId.getDistributedMember(), generateCallbacks, eventID);
 
     try {
       event.setContext(memberId);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
index 23f0e8e..f63df94 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
@@ -992,8 +992,8 @@ public class CacheServerStats implements MessageStats {
     return this.stats.getDouble(loadPerConnectionId);
   }
 
-  public int getProcessBatchRequests() {
-    return this.stats.getInt(processBatchRequestsId);
+  public long getProcessBatchRequests() {
+    return this.stats.getLong(processBatchRequestsId);
   }
 
   public void close() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 1a15cd3..848c3cb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.cache.wan.BatchException70;
 import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.security.AuthorizeRequest;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.util.BlobHelper;
@@ -308,7 +309,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     // attempt to update the entry
                     if (!result) {
                       result = region.basicBridgePut(key, value, null, isObject, callbackArg,
-                          serverConnection.getProxyID(), false, clientEvent);
+                          serverConnection.getProxyID(), clientEvent, true);
                     }
                   }
 
@@ -333,6 +334,7 @@ public class GatewayReceiverCommand extends BaseCommand {
               break;
 
             case 1: // Update
+            case GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS:
               try {
                 // Retrieve the value from the message parts (do not deserialize it)
                 valuePart = clientMessage.getPart(partNumber + 5);
@@ -405,8 +407,10 @@ public class GatewayReceiverCommand extends BaseCommand {
                   if (isPdxEvent) {
                     result = addPdxType(crHelper, key, value);
                   } else {
+                    boolean generateCallbacks =
+                        actionType != GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS;
                     result = region.basicBridgePut(key, value, null, isObject, callbackArg,
-                        serverConnection.getProxyID(), false, clientEvent);
+                        serverConnection.getProxyID(), clientEvent, generateCallbacks);
                   }
                   if (result || clientEvent.isConcurrencyConflict()) {
                     serverConnection.setModificationInfo(true, regionName, key);
@@ -637,7 +641,8 @@ public class GatewayReceiverCommand extends BaseCommand {
         exceptions.add(be);
       } finally {
         // Increment the partNumber
-        if (actionType == 0 /* create */ || actionType == 1 /* update */) {
+        if (actionType == 0 /* create */ || actionType == 1 /* update */
+            || actionType == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
           if (callbackArgExists) {
             partNumber += 9;
           } else {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
index 8ad1de7..d11fbf2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
@@ -379,7 +379,7 @@ public class Put70 extends BaseCommand {
               serverConnection.getProxyID(), true, clientEvent, true);
         } else {
           result = region.basicBridgePut(key, value, delta, isObject, callbackArg,
-              serverConnection.getProxyID(), true, clientEvent);
+              serverConnection.getProxyID(), clientEvent, true);
         }
         if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
             && clientEvent.getVersionTag() != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index e5683e0..58531bd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -23,6 +23,9 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
@@ -184,4 +187,10 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
   public void shutDownAckReaderConnection() {
     // no op
   }
+
+  @Override
+  public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+      ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
index 86828ae..42fbc02 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -16,6 +16,10 @@ package org.apache.geode.internal.cache.wan;
 
 import java.util.List;
 
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+
 /**
  * @since GemFire 7.0
  *
@@ -31,4 +35,9 @@ public interface GatewaySenderEventDispatcher {
   void stop();
 
   void shutDownAckReaderConnection();
+
+  void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+      ExecutablePool senderPool,
+      int batchId, boolean removeFromQueueOnException)
+      throws BatchException70;
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 7c3537e..9b80278 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -200,6 +200,8 @@ public class GatewaySenderEventImpl
 
   private static final int VERSION_ACTION = 3;
 
+  public static final int UPDATE_ACTION_NO_GENERATE_CALLBACKS = 4;
+
   private static final int INVALIDATE_ACTION = 5;
   /**
    * Static constants for Operation detail of EntryEvent.
@@ -309,7 +311,7 @@ public class GatewaySenderEventImpl
 
     // Initialize the action and number of parts (called after _callbackArgument
     // is set above)
-    initializeAction(this.operation);
+    initializeAction(this.operation, event);
 
     // initialize the operation detail
     initializeOperationDetail(event.getOperation());
@@ -1022,7 +1024,7 @@ public class GatewaySenderEventImpl
    *
    * @param operation The operation from which to initialize this event's action and number of parts
    */
-  protected void initializeAction(EnumListenerEvent operation) {
+  protected void initializeAction(EnumListenerEvent operation, EntryEventImpl event) {
     if (operation == EnumListenerEvent.AFTER_CREATE) {
       // Initialize after create action
       action = CREATE_ACTION;
@@ -1065,6 +1067,13 @@ public class GatewaySenderEventImpl
       // Initialize number of parts
       // Since there is no value, there is one less part
       numberOfParts = (callbackArgument == null) ? 7 : 8;
+    } else if (operation == EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS) {
+      if (event.isGenerateCallbacks()) {
+        action = UPDATE_ACTION;
+      } else {
+        action = UPDATE_ACTION_NO_GENERATE_CALLBACKS;
+      }
+      numberOfParts = (this.callbackArgument == null) ? 8 : 9;
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
index edb6f72..3e6a7f6 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EnumListenerEventJUnitTest.java
@@ -47,7 +47,8 @@ public class EnumListenerEventJUnitTest {
     // extra non-existent code checks as a markers so that this test will
     // fail if further events are added (0th or +1 codes) without updating this test
     checkAndAssert(18, EnumListenerEvent.TIMESTAMP_UPDATE);
-    checkAndAssert(19, null);
+    checkAndAssert(19, EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS);
+    checkAndAssert(20, null);
   }
 
   // check that the code and object both match
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
index 7ee3e26..2938a9c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put70Test.java
@@ -138,7 +138,7 @@ public class Put70Test {
     when(keyPart.getStringOrObject()).thenReturn(KEY);
 
     when(localRegion.basicBridgePut(eq(KEY), eq(VALUE), eq(null), eq(true), eq(CALLBACK_ARG),
-        any(), eq(true), any())).thenReturn(true);
+        any(), any(), eq(true))).thenReturn(true);
 
     when(message.getNumberOfParts()).thenReturn(8);
     when(message.getPart(eq(0))).thenReturn(regionNamePart);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
index 1755991..6e05864 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
@@ -113,13 +114,12 @@ public class GatewaySenderEventImplTest {
   @Parameters(method = "getVersionsAndExpectedInvocations")
   public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations vaei)
       throws IOException {
-    InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
     GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
     OutputStream outputStream = mock(OutputStream.class);
     VersionedDataOutputStream versionedDataOutputStream =
         new VersionedDataOutputStream(outputStream, vaei.getVersion());
 
-    internalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
+    InternalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
     verify(gatewaySenderEvent, times(0)).toData(any(), any());
     verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).toDataPre_GEODE_1_15_0_0(any(),
         any());
@@ -134,7 +134,6 @@ public class GatewaySenderEventImplTest {
   public void testDeserializingDataFromOldVersionToCurrentVersion(
       VersionAndExpectedInvocations vaei)
       throws IOException, ClassNotFoundException {
-    InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
     GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
     InputStream inputStream = mock(InputStream.class);
     when(inputStream.read()).thenReturn(69); // NULL_STRING
@@ -142,7 +141,7 @@ public class GatewaySenderEventImplTest {
     VersionedDataInputStream versionedDataInputStream =
         new VersionedDataInputStream(inputStream, vaei.getVersion());
 
-    internalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
+    InternalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
     verify(gatewaySenderEvent, times(0)).fromData(any(), any());
     verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).fromDataPre_GEODE_1_15_0_0(any(),
         any());
@@ -349,6 +348,35 @@ public class GatewaySenderEventImplTest {
     return cacheEvent;
   }
 
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks,
+      boolean isCallbackArgumentNull)
+      throws IOException {
+    InternalRegion region = mock(InternalRegion.class);
+    when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");
+
+    Operation operation = mock(Operation.class);
+    when(operation.isLocalLoad()).thenReturn(true);
+
+    EntryEventImpl cacheEvent = mock(EntryEventImpl.class);
+    when(cacheEvent.getRegion()).thenReturn(region);
+    when(cacheEvent.getEventId()).thenReturn(mock(EventID.class));
+    when(cacheEvent.getOperation()).thenReturn(operation);
+    when(cacheEvent.isGenerateCallbacks()).thenReturn(isGenerateCallbacks);
+    when(cacheEvent.getRawCallbackArgument())
+        .thenReturn(isCallbackArgumentNull ? null : mock(GatewaySenderEventCallbackArgument.class));
+
+    GatewaySenderEventImpl event = new GatewaySenderEventImpl(
+        EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS, cacheEvent,
+        null, false, INCLUDE_LAST_EVENT);
+
+    final int numberOfParts = isCallbackArgumentNull ? 8 : 9;
+    assertThat(event.getNumberOfParts()).isEqualTo(numberOfParts);
+
+    final int action = isGenerateCallbacks ? 1 : 4;
+    assertThat(event.getAction()).isEqualTo(action);
+  }
+
   public static class VersionAndExpectedInvocations {
 
     private final KnownVersion version;
diff --git a/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb
new file mode 100644
index 0000000..38589c6
--- /dev/null
+++ b/geode-docs/tools_modules/gfsh/command-pages/wan_copy_region.html.md.erb
@@ -0,0 +1,192 @@
+---
+title:  wan-copy region
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+Copy the entries of a region in a WAN site onto the same region in another WAN site, using a gateway sender.
+
+This command copies region entries from a WAN site to another by putting them in batches of configurable
+size that are sent to the remote site by the selected gateway sender. Batch size is specified as number of entries per batch.
+
+The command allows you to specify a maximum copy rate in order not to stress excessively the sending or receiving WAN sites.
+This rate is configured in entries per second.
+
+Callbacks (cache listeners, replication to other WAN sites) will not be executed in the remote WAN site for the entries copied.
+
+The main uses of this command are:
+
+-   Recovery of a WAN site after a disaster in which the failed site needs to be put into service again with
+    the data from another WAN site.
+
+-   Adding a new WAN site to a <%=vars.product_name%> system in which the data in the new WAN site needs to be initially loaded from
+    an existing WAN site.
+
+The execution of a currently running instance of this command may be stopped by using
+this same command with the `--cancel` option.
+
+**Requirements:**
+
+The `wan-copy region` command requires that
+
+- a gateway sender is configured and running on the source WAN site
+- a gateway receiver is configured and running on the remote WAN site
+- the region onto which the data will be copied has already been created on the remote WAN site
+
+**Availability:** Online. You must be connected in `gfsh` to a JMX Manager member to use this command.
+
+**Syntax:**
+
+``` pre
+wan-copy region --region=value --sender-id=value [--max-rate=value] [--batch-size=value]
+  [--cancel]
+```
+
+<a id="wan_copy_region_command_params"></a>
+<style>
+table th:first-of-type {
+    width: 20%;
+}
+table th:nth-of-type(2) {
+    width: 60%;
+}
+table th:nth-of-type(3) {
+    width: 20%;
+}
+</style>
+
+| Name | Description | Default Value |
+|------|-------------|---------------|
+| &#8209;&#8209;region| <em>Required</em>. The region for which the data is to be copied. | |
+| &#8209;&#8209;sender-id| <em>Required</em>. The gateway sender to be used to copy the region entries. | |
+| &#8209;&#8209;max-rate| The maximum copy rate in entries per second. If the sender is parallel, the maximum rate limit is applied to each server hosting buckets for the region to be copied. | 0 (unlimited) |
+| &#8209;&#8209;batch-size| The size of the batches, in number of entries, to be used to copy the region entries. | 1000 |
+| &#8209;&#8209;cancel| Cancel a running `wan-copy region` command for the specified sender and region. If the `sender-id` and `region` passed are both "*", then all running `wan-copy region` commands will be canceled. |  |
+
+<span class="tablecap">Table 1. Copy Region Parameters</span>
+
+
+**Example Commands:**
+
+``` pre
+wan-copy region --region=myRegion--sender-id=mySender --max-rate=1000 --batch-size=100
+```
+
+``` pre
+wan-copy region --region=/overload --sender-id=sender1 --cancel
+```
+
+``` pre
+wan-copy region --region=* --sender-id=* --cancel
+```
+
+**Sample Output:**
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=myParallelSender --max-rate=100 --batch-size=100
+        Member     | Status | Message
+    -------------- | ------ | -----------------------
+    server-sender  | OK     | Entries copied: 333
+    server-sender3 | OK     | Entries copied: 334
+    server-sender2 | OK     | Entries copied: 333
+
+```
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=mySerialSender --max-rate=100 --batch-size=100
+        Member     | Status | Message
+    -------------- | ------ | ----------------------------------------------------------------------
+    server-sender2 | OK     | Sender mySerialSender is serial and not primary. 0 entries copied.
+    server-sender3 | OK     | Sender mySerialSender is serial and not primary. 0 entries copied.
+    server-sender  | OK     | Entries copied: 1000
+```
+
+
+``` pre
+gfsh>wan-copy region --region=/overload --sender-id=sender1 --cancel
+        Member     | Status | Message
+    -------------- | ------ | ------------------
+    server-sender2 | OK     | Execution canceled
+    server-sender  | OK     | Execution canceled
+    server-sender3 | OK     | Execution canceled
+```
+
+``` pre
+gfsh>wan-copy region --region=myRegion --sender-id=myParallelSender --max-rate=100 --batch-size=10
+        Member     | Status | Message
+    -------------- | ------ | ------------------------------------------------------
+    server-sender2 | OK     | Operation canceled after having copied 10 entries
+    server-sender  | OK     | Operation canceled after having copied 10 entries
+    server-sender3 | OK     | Operation canceled after having copied 10 entries
+```
+
+``` pre
+gfsh>wan-copy region --region=myRegion --sender-id=myParallelSender --max-rate=100 --batch-size=10
+        Member     | Status | Message
+    -------------- | ------ | ----------------------------------------------------------------------
+    server-sender2 | OK     | Sender mySerialSender is serial and not primary. 0 entries copied.
+    server-sender3 | OK     | Sender mySerialSender is serial and not primary. 0 entries copied.
+    server-sender  | OK     | Operation canceled after having copied 4 entries
+```
+
+**Error Messages:**
+
+Example of `wan-copy region` with an invalid region:
+
+``` pre
+gfsh> wan-copy region --region=/regionX --sender-id=sender1
+        Member     | Status | Message
+    -------------- | ------ | -------------------------
+    server-sender  | ERROR  | Region /regionX not found
+    server-sender2 | ERROR  | Region /regionX not found
+    server-sender3 | ERROR  | Region /regionX not found
+```
+
+Example of `wan-copy region` with a stopped gateway sender:
+
+``` pre
+gfsh> wan-copy region --region=/region1 --sender-id=sender1
+        Member     | Status | Message
+    -------------- | ------ | -----------------------------
+    server-sender  | ERROR  | Sender sender1 is not running
+    server-sender2 | ERROR  | Sender sender1 is not running
+    server-sender3 | ERROR  | Sender sender1 is not running
+```
+
+Example of cancel of `wan-copy region` when no command is running:
+
+``` pre
+gfsh> wan-copy region --region=/region1 --sender-id=sender1 --cancel
+        Member     | Status | Message
+    -------------- | ------ | ------------------------------------------------------------------------------------
+    server-sender2 | ERROR  | No running command to be canceled for region /region1 and sender sender1
+    server-sender  | ERROR  | No running command to be canceled for region /region1 and sender sender1
+    server-sender3 | ERROR  | No running command to be canceled for region /region1 and sender sender1
+```
+
+Example of cancel of all running `wan-copy region` commands:
+
+``` pre
+gfsh> wan-copy region --region=* --sender-id=* --cancel
+        Member     | Status | Message
+    -------------- | ------ | ------------------------------------------------------------------------------------
+    server-sender2 | OK     | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+    server-sender  | OK     | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+    server-sender3 | OK     | Executions canceled: [(myRegion,mySender1), (myRegion,mySender)]
+```
+
diff --git a/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb b/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
index f503849..120dbbb 100644
--- a/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/gfsh_command_index.html.md.erb
@@ -161,7 +161,7 @@ This section provides help and usage information on all `gfsh` commands, listed
 
     Remove an entry from a region.
 
--   **[restore redunadancy](../../tools_modules/gfsh/command-pages/restore.html)**
+-   **[restore redundancy](../../tools_modules/gfsh/command-pages/restore.html)**
 
     Restore redundancy to partitioned regions and optionally reassign which members host the primary copies.
 
@@ -221,4 +221,7 @@ This section provides help and usage information on all `gfsh` commands, listed
 
     Display product version information.
 
+-   **[wan-copy region](../../tools_modules/gfsh/command-pages/wan_copy_region.html)**
+
+    Copy the data of a region from a WAN site to the same region on another WAN site by using a gateway sender.
 
diff --git a/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb b/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
index 46c99cf..a80812b 100644
--- a/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/quick_ref_commands_by_area.html.md.erb
@@ -112,6 +112,7 @@ limitations under the License.
 | [put](command-pages/put.html)                                                               | Add or update a region entry.                                   | online       |
 | [query](command-pages/query.html)                                                      | Run queries against <%=vars.product_name%> regions. | online       |
 | [remove](command-pages/remove.html)                                                        | Remove an entry from a region.                                  | online       |
+| [wan-copy region ](command-pages/wan_copy_region.html) | Copy the entries of a region in a WAN site onto the same region in another WAN site, using a gateway sender.  | online       |
 
 ## <a id="topic_1B47A0E110124EB6BF08A467EB510412" class="no-quick-link"></a>Deployment Commands
 
diff --git a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
index b23f763..9988356 100644
--- a/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
+++ b/geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
@@ -413,7 +413,7 @@ public class GfshParserAutoCompletionIntegrationTest {
     String hintArgument = "data";
     String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
     String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
-    assertThat(hintsProvidedArray.length).isEqualTo(17);
+    assertThat(hintsProvidedArray).hasSize(17);
     assertThat(hintsProvidedArray[0])
         .isEqualTo("User data as stored in regions of the Geode distributed system.");
   }
@@ -423,7 +423,7 @@ public class GfshParserAutoCompletionIntegrationTest {
     String hintArgument = "";
     String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
     String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
-    assertThat(hintsProvidedArray.length).isEqualTo(21);
+    assertThat(hintsProvidedArray).hasSize(21);
     assertThat(hintsProvidedArray[0]).isEqualTo(
         "Hints are available for the following topics. Use \"hint <topic-name>\" for a specific hint.");
   }
@@ -433,7 +433,7 @@ public class GfshParserAutoCompletionIntegrationTest {
     String hintArgument = "fortytwo";
     String hintsProvided = gfshParserRule.getCommandManager().obtainHint(hintArgument);
     String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
-    assertThat(hintsProvidedArray.length).isEqualTo(1);
+    assertThat(hintsProvidedArray).hasSize(1);
     assertThat(hintsProvidedArray[0]).isEqualTo(
         "Unknown topic: " + hintArgument + ". Use hint to view the list of available topics.");
   }
diff --git a/geode-wan/build.gradle b/geode-wan/build.gradle
index 2ff4cc0..3d00a2a 100644
--- a/geode-wan/build.gradle
+++ b/geode-wan/build.gradle
@@ -30,6 +30,7 @@ dependencies {
   implementation(project(':geode-serialization'))
   implementation(project(':geode-tcp-server'))
   implementation(project(':geode-core'))
+  implementation(project(':geode-gfsh'))
   implementation('org.apache.commons:commons-lang3')
 
   compileOnly('org.apache.logging.log4j:log4j-api')
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java
new file mode 100644
index 0000000..df628fc
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java
@@ -0,0 +1,1399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__BATCHSIZE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__CANCEL;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MAXRATE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__COPIED__ENTRIES;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__REGION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__SENDERID;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.ImmutableList;
+import junitparams.Parameters;
+import org.assertj.core.api.Condition;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.junit.assertions.CommandResultAssert;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.LocatorLauncherStartupRule;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+@Category({WanTest.class})
+public class WanCopyRegionCommandDUnitTest extends WANTestBase {
+
+  protected static VM vm8;
+
+  private static final long serialVersionUID = 1L;
+
+  private enum Gateway {
+    SENDER, RECEIVER
+  }
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule = new DistributedExecutorServiceRule();
+
+  @BeforeClass
+  public static void beforeClassWanCopyRegionCommandDUnitTest() {
+    vm8 = VM.getVM(8);
+  }
+
+  @Test
+  public void testUnsuccessfulExecution_RegionNotFound() throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(true, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    int wanCopyRegionBatchSize = 20;
+    String regionName = "foo";
+
+    // Execute wan-copy region command
+    gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .getCommandString();
+
+    // Check command status and output
+    CommandResultAssert command =
+        verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+    String message =
+        CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
+            regionName);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .containsExactly(message, message, message);
+  }
+
+  @Test
+  public void testUnsuccessfulExecution_SenderNotFound() throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(true, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    int wanCopyRegionBatchSize = 20;
+    String regionName = getRegionName(true);
+
+    // Execute wan-copy region command
+    gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .getCommandString();
+
+    // Check command status and output
+    CommandResultAssert command =
+        verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+    String message =
+        CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, senderIdInA);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .containsExactly(message, message, message);
+  }
+
+  @Test
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testUnsuccessfulExecution_ExceptionAtReceiver(
+      boolean isPartitionedRegion, boolean isParallelGatewaySender) throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    String regionName = getRegionName(isPartitionedRegion);
+    int wanCopyRegionBatchSize = 20;
+
+    int entries = 20;
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+    // Check that entries are put in the region
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // destroy region to provoke the exception
+    serverInB.invoke(() -> destroyRegion(regionName));
+
+    // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+    createSenders(isParallelGatewaySender, serversInA, serverInB,
+        senderIdInA, senderIdInB);
+    createReceivers(serverInB, serverInC);
+
+    // Execute wan-copy region command
+    gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .getCommandString();
+
+    // Check command status and output
+    if (isParallelGatewaySender) {
+      CommandResultAssert command =
+          verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+      Condition<String> exceptionError =
+          new Condition<>(s -> s.startsWith("Error ("), "Error");
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .asList()
+          .haveExactly(3, exceptionError);
+    } else {
+      CommandResultAssert command =
+          verifyStatusIsErrorInOneServer(gfsh.executeAndAssertThat(commandString));
+      Condition<String> exceptionError =
+          new Condition<>(s -> s.startsWith("Error ("), "Error");
+      Condition<String> senderNotPrimary = new Condition<>(
+          s -> s.equals(CliStrings
+              .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+                  senderIdInA)),
+          "sender not primary");
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .asList()
+          .haveExactly(1, exceptionError)
+          .haveExactly(2, senderNotPrimary);
+    }
+  }
+
+  private Object[] parametersToTestSenderOrReceiverGoesDownDuringExecution() {
+    return new Object[] {
+        new Object[] {true, true, Gateway.SENDER, false},
+        new Object[] {false, true, Gateway.SENDER, true},
+        new Object[] {false, true, Gateway.SENDER, false},
+        new Object[] {false, false, Gateway.SENDER, true},
+        new Object[] {false, false, Gateway.SENDER, false},
+        new Object[] {true, true, Gateway.RECEIVER, false},
+        new Object[] {false, true, Gateway.RECEIVER, true},
+        new Object[] {false, true, Gateway.RECEIVER, false},
+        new Object[] {false, false, Gateway.RECEIVER, true},
+        new Object[] {false, false, Gateway.RECEIVER, false}
+    };
+  }
+
+  /**
+   * This test creates two sites A & B, each one containing 3 servers.
+   * A region is created in both sites, and populated in site A.
+   * After that replication is configured from site A to site B.
+   * WanCopyRegionFunction is called, and while it is running, a sender in site A
+   * or a receiver in site B are killed.
+   */
+  @Test
+  @Parameters(method = "parametersToTestSenderOrReceiverGoesDownDuringExecution")
+  public void testSenderOrReceiverGoesDownDuringExecution(boolean useParallel,
+      boolean usePartitionedRegion, Gateway gwToBeStopped, boolean stopPrimarySender)
+      throws Exception {
+
+    if (gwToBeStopped == Gateway.SENDER && (useParallel || stopPrimarySender)) {
+      addIgnoredExceptionsForSenderInUseWentDown();
+    }
+    if (gwToBeStopped == Gateway.RECEIVER &&
+        (usePartitionedRegion || !useParallel)) {
+      addIgnoredExceptionsForReceiverConnectedToSenderInUseWentDown();
+    }
+    final int wanCopyRegionBatchSize = 10;
+    final int entries;
+    if (!useParallel && !usePartitionedRegion && stopPrimarySender) {
+      entries = 2500;
+    } else {
+      entries = 1000;
+    }
+
+    final String regionName = getRegionName(usePartitionedRegion);
+
+    // Site A
+    VM locatorInA = vm0;
+    VM server1InA = vm1;
+    VM server2InA = vm2;
+    VM server3InA = vm3;
+    List<VM> serversInA = Arrays.asList(server1InA, server2InA, server3InA);
+    final String senderIdInA = "B";
+
+    // Site B
+    VM locatorInB = vm4;
+    VM server1InB = vm5;
+    VM server2InB = vm6;
+    VM server3InB = vm7;
+    List<VM> serversInB = Arrays.asList(server1InB, server2InB, server3InB);
+    VM client = vm8;
+
+    int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+        serversInB, client, usePartitionedRegion, regionName);
+
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // Create senders and receivers with replication as follows: "A" -> "B"
+    if (useParallel) {
+      createReceiverInVMs(server1InB, server2InB, server3InB);
+      createSenders(useParallel, serversInA, null, senderIdInA, null);
+    } else {
+      // Senders will connect to receiver in server1InB
+      server1InB.invoke(WANTestBase::createReceiver);
+      createSenders(useParallel, serversInA, null, senderIdInA, null);
+      createReceiverInVMs(server2InB, server3InB);
+    }
+
+    Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+      String command = new CommandStringBuilder(WAN_COPY_REGION)
+          .addOption(WAN_COPY_REGION__REGION, regionName)
+          .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+          .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+          .addOption(WAN_COPY_REGION__MAXRATE, "50")
+          .getCommandString();
+      try {
+        gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      return gfsh.executeAndAssertThat(command);
+    };
+
+    Future<CommandResultAssert> wanCopyCommandFuture =
+        executorServiceRule.submit(wanCopyCommandCallable);
+
+    // Wait for the wan-copy command to start
+    waitForWanCopyRegionCommandToStart(useParallel, usePartitionedRegion, serversInA);
+
+    // Stop sender or receiver and verify result
+    if (gwToBeStopped == Gateway.SENDER) {
+      stopSenderAndVerifyResult(useParallel, stopPrimarySender, server2InA, serversInA, senderIdInA,
+          wanCopyCommandFuture);
+    } else if (gwToBeStopped == Gateway.RECEIVER) {
+      stopReceiverAndVerifyResult(useParallel, stopPrimarySender, entries, regionName, server1InB,
+          server2InB, server3InB, wanCopyCommandFuture);
+    }
+  }
+
+  @Test
+  @Parameters({"false, false", "false, true", "true, true"})
+  public void testRegionDestroyedDuringExecution(boolean isParallelGatewaySender,
+      boolean isPartitionedRegion)
+      throws Exception {
+    final int wanCopyRegionBatchSize = 10;
+    final int entries = 1000;
+
+    final String regionName = getRegionName(isPartitionedRegion);
+
+    // Site A
+    VM locatorInA = vm0;
+    VM server1InA = vm1;
+    List<VM> serversInA = ImmutableList.of(server1InA);
+    final String senderIdInA = "B";
+
+    // Site B
+    VM locatorInB = vm4;
+    VM server1InB = vm5;
+    List<VM> serversInB = ImmutableList.of(server1InB);
+    VM client = vm8;
+
+    int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+        serversInB, client, isPartitionedRegion, regionName);
+
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // Create senders and receivers with replication as follows: "A" -> "B"
+    createReceiverInVMs(server1InB);
+    createSenders(isParallelGatewaySender, serversInA, null, senderIdInA, null);
+
+    Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+      String command = new CommandStringBuilder(WAN_COPY_REGION)
+          .addOption(WAN_COPY_REGION__REGION, regionName)
+          .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+          .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+          .addOption(WAN_COPY_REGION__MAXRATE, "5")
+          .getCommandString();
+      try {
+        gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      return gfsh.executeAndAssertThat(command);
+    };
+
+    Future<CommandResultAssert> wanCopyCommandFuture =
+        executorServiceRule.submit(wanCopyCommandCallable);
+
+    // Wait for the wan-copy command to start
+    waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+    // Destroy region in a server in A
+    server1InA.invoke(() -> cache.getRegion(regionName).destroyRegion());
+
+    CommandResultAssert result = wanCopyCommandFuture.get();
+    result.statusIsError();
+    result
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    result
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(1);
+    result
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(1);
+    result
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactly("ERROR");
+
+    Condition<String> regionDestroyedError;
+    regionDestroyedError = new Condition<>(
+        s -> (s.startsWith(
+            "Execution failed. Error: org.apache.geode.cache.RegionDestroyedException: ")
+            || s.startsWith("Error (Region destroyed) in operation after having copied")),
+        "execution error");
+    result
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(1, regionDestroyedError);
+  }
+
+  @Test
+  @Parameters({"false, false", "false, true", "true, true"})
+  public void testDetectOngoingExecution(boolean useParallel,
+      boolean usePartitionedRegion)
+      throws Exception {
+    final int wanCopyRegionBatchSize = 10;
+    final int entries = 1000;
+
+    final String regionName = getRegionName(usePartitionedRegion);
+
+    // Site A
+    VM locatorInA = vm0;
+    VM server1InA = vm1;
+    VM server2InA = vm2;
+    VM server3InA = vm3;
+    List<VM> serversInA = Arrays.asList(server1InA, server2InA, server3InA);
+    final String senderIdInA = "B";
+
+    // Site B
+    VM locatorInB = vm4;
+    VM server1InB = vm5;
+    VM server2InB = vm6;
+    VM server3InB = vm7;
+    List<VM> serversInB = Arrays.asList(server1InB, server2InB, server3InB);
+    VM client = vm8;
+
+    int locatorAPort = create2WanSitesAndClient(locatorInA, serversInA, senderIdInA, locatorInB,
+        serversInB, client, usePartitionedRegion, regionName);
+
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // Create senders and receivers with replication as follows: "A" -> "B"
+    if (useParallel) {
+      createReceiverInVMs(server1InB, server2InB, server3InB);
+      createSenders(useParallel, serversInA, null, senderIdInA, null);
+    } else {
+      // Senders will connect to receiver in server1InB
+      server1InB.invoke(WANTestBase::createReceiver);
+      createSenders(useParallel, serversInA, null, senderIdInA, null);
+      createReceiverInVMs(server2InB, server3InB);
+    }
+
+    Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+      String command = new CommandStringBuilder(WAN_COPY_REGION)
+          .addOption(WAN_COPY_REGION__REGION, regionName)
+          .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+          .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+          .addOption(WAN_COPY_REGION__MAXRATE, "5")
+          .getCommandString();
+      try {
+        gfsh.connectAndVerify(locatorAPort, GfshCommandRule.PortType.locator);
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      return gfsh.executeAndAssertThat(command);
+    };
+
+    Future<CommandResultAssert> wanCopyCommandFuture =
+        executorServiceRule.submit(wanCopyCommandCallable);
+
+    // Wait for the wan-copy command to start
+    waitForWanCopyRegionCommandToStart(useParallel, usePartitionedRegion, serversInA);
+
+    // Execute again the same wan-copy region command
+    String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .getCommandString();
+
+    // Check command status and output
+    Condition<String> exceptionError = new Condition<>(
+        s -> s.equals(CliStrings.format(WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+            regionName, senderIdInA)),
+        "already running");
+    if (useParallel) {
+      CommandResultAssert command =
+          verifyStatusIsError(gfsh.executeAndAssertThat(commandString));
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .asList()
+          .haveExactly(3, exceptionError);
+    } else {
+      CommandResultAssert command =
+          verifyStatusIsErrorInOneServer(gfsh.executeAndAssertThat(commandString));
+      Condition<String> senderNotPrimary = new Condition<>(
+          s -> s.equals(CliStrings
+              .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+                  senderIdInA)),
+          "sender not primary");
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .asList()
+          .haveExactly(1, exceptionError)
+          .haveExactly(2, senderNotPrimary);
+    }
+
+    // cancel command
+    String commandString1 = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .addOption(WAN_COPY_REGION__CANCEL)
+        .getCommandString();
+    gfsh.executeAndAssertThat(commandString1);
+    wanCopyCommandFuture.get();
+    addIgnoredExceptionsForClosingAfterCancelCommand();
+  }
+
+  /**
+   * Scenario with 3 WAN sites: "A", "B" and "C".
+   * Initially, no replication is configured between sites.
+   * Several entries are put in WAN site "A".
+   *
+   * The following gateway senders are created and started:
+   * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+   * - In "B" site: to replicate region entries to "C" site. Sender called "C".
+   * (Replication is as follows: A -> B -> C)
+   *
+   * The "wan-copy region" command is run from "A" site passing sender "B".
+   *
+   * It must be verified that the entries are copied to site "B".
+   * It must also be verified that the entries are not transitively
+   * copied to "C" even though replication is configured from "B" to "C"
+   * because with this command generateCallbacks is set to false in the
+   * events generated.
+   *
+   */
+  @Test
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testSuccessfulExecution(boolean isPartitionedRegion,
+      boolean isParallelGatewaySender) throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    int wanCopyRegionBatchSize = 20;
+    int entries = 100;
+    String regionName = getRegionName(isPartitionedRegion);
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries + 1));
+    // remove an entry to make sure that replication works well even when removes.
+    client.invoke(() -> removeEntry(regionName, entries));
+
+    // Check that entries are put in the region
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // Check that entries are not copied to "B" nor "C"
+    serverInB.invoke(() -> validateRegionSize(regionName, 0));
+    serverInC.invoke(() -> validateRegionSize(regionName, 0));
+
+    // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+    createSenders(isParallelGatewaySender, serversInA, serverInB,
+        senderIdInA, senderIdInB);
+    createReceivers(serverInB, serverInC);
+
+    // Check that entries are not copied to "B" nor "C"
+    serverInB.invoke(() -> validateRegionSize(regionName, 0));
+    serverInC.invoke(() -> validateRegionSize(regionName, 0));
+
+    // Execute wan-copy region command
+    gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String commandString = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+        .getCommandString();
+
+    // Check command status and output
+    CommandResultAssert command =
+        verifyStatusIsOk(gfsh.executeAndAssertThat(commandString));
+    if (isPartitionedRegion && isParallelGatewaySender) {
+      String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 33);
+      String msg2 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 34);
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactlyInAnyOrder(msg1, msg1, msg2);
+    } else {
+      String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__COPIED__ENTRIES, 100);
+      String msg2 = CliStrings
+          .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY, senderIdInA);
+      command
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactlyInAnyOrder(msg1, msg2, msg2);
+    }
+
+    // Check that entries are copied in "B"
+    serverInB.invoke(() -> validateRegionSize(regionName, entries));
+
+    // Check that the region's data is the same in sites "A" and "B"
+    checkEqualRegionData(regionName, serversInA.get(0), serverInB);
+
+    // Check that wanCopyRegionBatchSize is correctly used by the command
+    long receivedBatches = serverInB.invoke(() -> getReceiverStats().get(2));
+    if (isPartitionedRegion && isParallelGatewaySender) {
+      assertThat(receivedBatches).isEqualTo(6);
+    } else {
+      assertThat(receivedBatches).isEqualTo(5);
+    }
+
+    // Check that entries are not copied in "C" (generateCallbacks is false)
+    serverInC.invoke(() -> validateRegionSize(regionName, 0));
+  }
+
+  /**
+   * Scenario with 2 WAN sites: "A" and "B".
+   * Initially, no replication is configured between sites.
+   * Several entries are put in WAN site "A".
+   *
+   * The following gateway senders are created and started:
+   * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+   * (Replication is as follows: A -> B)
+   *
+   * The "wan-copy region" command is run from "A" site passing sender "B".
+   * Simultaneously, random operations for entries with the same
+   * keys as the one previously put are run.
+   *
+   * When the command finishes and the puts finish it
+   * must be verified that the entries in the region in site "A"
+   * are the same as the ones in region in site "B".
+   */
+  @Test
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testSuccessfulExecutionWhileRunningOpsOnRegion(
+      boolean isPartitionedRegion,
+      boolean isParallelGatewaySender) throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    int wanCopyRegionBatchSize = 20;
+    int entries = 10000;
+    Set<Long> keySet = LongStream.range(0L, entries).boxed().collect(Collectors.toSet());
+    String regionName = getRegionName(isPartitionedRegion);
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+    // Check that entries are put in the region
+    for (VM member : serversInA) {
+      member.invoke(() -> validateRegionSize(regionName, entries));
+    }
+
+    // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+    createSenders(isParallelGatewaySender, serversInA, serverInB,
+        senderIdInA, senderIdInB);
+    createReceivers(serverInB, serverInC);
+
+    // Let maxRate be a 5th of the number of entries in order to have the
+    // command running for about 5 seconds so that there is time
+    // for the random operations to be done at the same time
+    // as the command is running.
+    // The rate is divided by the number of servers in case the sender is parallel
+    int maxRate = (entries / 5) / (isParallelGatewaySender ? serversInA.size() : 1);
+
+    // Execute wan-copy region command
+    Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+      String command = new CommandStringBuilder(WAN_COPY_REGION)
+          .addOption(WAN_COPY_REGION__REGION, regionName)
+          .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+          .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+          .addOption(WAN_COPY_REGION__MAXRATE, String.valueOf(maxRate))
+          .getCommandString();
+      try {
+        gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      return gfsh.executeAndAssertThat(command);
+    };
+
+    Future<CommandResultAssert> wanCopyCommandFuture =
+        executorServiceRule.submit(wanCopyCommandCallable);
+
+    // Wait for the wan-copy command to start
+    waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+    // While the command is running, send some random operations over the same keys
+    AsyncInvocation<Void> asyncOps1 =
+        client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+    AsyncInvocation<Void> asyncOps2 =
+        client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+    AsyncInvocation<Void> asyncOps3 =
+        client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 3));
+
+    // Check command status and output
+    CommandResultAssert command = wanCopyCommandFuture.get();
+    verifyStatusIsOk(command);
+
+    // Wait for random operations to finish
+    asyncOps1.await();
+    asyncOps2.await();
+    asyncOps3.await();
+
+    // Wait for entries to be replicated (replication queues empty)
+    for (VM server : serversInA) {
+      server.invoke(() -> getSenderStats(senderIdInA, 0));
+    }
+
+    // Check that the region's data is the same in sites "A" and "B"
+    checkEqualRegionData(regionName, serversInA.get(0), serverInB);
+  }
+
+  /**
+   * Cancel is executed when no wan-copy region command is running.
+   */
+  @Test
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testUnsuccessfulCancelExecution(boolean isPartitionedRegion,
+      boolean isParallelGatewaySender) throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    String regionName = getRegionName(isPartitionedRegion);
+
+    // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+    createSenders(isParallelGatewaySender, serversInA, serverInB,
+        senderIdInA, senderIdInB);
+    createReceivers(serverInB, serverInC);
+
+    // Execute cancel wan-copy region command
+    gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String cancelCommand = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__CANCEL)
+        .getCommandString();
+
+    CommandResultAssert cancelCommandResult =
+        verifyStatusIsError(gfsh.executeAndAssertThat(cancelCommand));
+    String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+        regionName, senderIdInA);
+    cancelCommandResult
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .containsExactlyInAnyOrder(msg1, msg1, msg1);
+  }
+
+  /**
+   * Scenario with 3 WAN sites: "A", "B" and "C".
+   * Initially, no replication is configured between sites.
+   * Several entries are put in WAN site "A".
+   *
+   * The following gateway senders are created and started:
+   * - In "A" site: to replicate region entries to "B" site. Sender called "B".
+   * - In "B" site: to replicate region entries to "C" site. Sender called "C".
+   * (Replication is as follows: A -> B -> C)
+   *
+   * The "wan-copy region" command is run from "A" site passing sender "B"
+   * in a different thread. The maxRate is set to a very low value so that there is
+   * time to cancel it before it finishes.
+   *
+   * The "wan-copy region" command with the cancel option
+   * is run from "A" site passing sender "B".
+   *
+   * It must be verified that the command is canceled.
+   * Also, the output of the command must show
+   * the number of entries copied before the command was canceled.
+   */
+  @Test
+  @Parameters({"true, true", "true, false", "false, false"})
+  public void testSuccessfulCancelExecution(boolean isPartitionedRegion,
+      boolean isParallelGatewaySender) throws Exception {
+    List<VM> serversInA = Arrays.asList(vm5, vm6, vm7);
+    VM serverInB = vm3;
+    VM serverInC = vm4;
+    VM client = vm8;
+    String senderIdInA = "B";
+    String senderIdInB = "C";
+
+    int senderLocatorPort = create3WanSitesAndClient(isPartitionedRegion, vm0,
+        vm1, vm2, serversInA, serverInB, serverInC, client,
+        senderIdInA, senderIdInB);
+
+    int wanCopyRegionBatchSize = 20;
+    int entries = 100;
+    String regionName = getRegionName(isPartitionedRegion);
+    // Put entries
+    client.invoke(() -> doPutsFrom(regionName, 0, entries));
+
+    // Create senders and receivers with replication as follows: "A" -> "B" -> "C"
+    createSenders(isParallelGatewaySender, serversInA, serverInB,
+        senderIdInA, senderIdInB);
+    createReceivers(serverInB, serverInC);
+
+    Callable<CommandResultAssert> wanCopyCommandCallable = () -> {
+      String command = new CommandStringBuilder(WAN_COPY_REGION)
+          .addOption(WAN_COPY_REGION__REGION, regionName)
+          .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+          .addOption(WAN_COPY_REGION__BATCHSIZE, String.valueOf(wanCopyRegionBatchSize))
+          .addOption(WAN_COPY_REGION__MAXRATE, String.valueOf(1))
+          .getCommandString();
+      try {
+        gfsh.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      return gfsh.executeAndAssertThat(command).statusIsError();
+    };
+
+    Future<CommandResultAssert> wanCopyCommandFuture =
+        executorServiceRule.submit(wanCopyCommandCallable);
+
+    // Wait for the wan-copy command to start
+    waitForWanCopyRegionCommandToStart(isParallelGatewaySender, isPartitionedRegion, serversInA);
+
+    // Cancel wan-copy region command
+    GfshCommandRule gfshCancelCommand = new GfshCommandRule();
+    gfshCancelCommand.connectAndVerify(senderLocatorPort, GfshCommandRule.PortType.locator);
+    String cancelCommand = new CommandStringBuilder(WAN_COPY_REGION)
+        .addOption(WAN_COPY_REGION__REGION, regionName)
+        .addOption(WAN_COPY_REGION__SENDERID, senderIdInA)
+        .addOption(WAN_COPY_REGION__CANCEL)
+        .getCommandString();
+    CommandResultAssert cancelCommandResult =
+        gfshCancelCommand.executeAndAssertThat(cancelCommand);
+
+    if (isPartitionedRegion && isParallelGatewaySender) {
+      verifyStatusIsOk(cancelCommandResult);
+      cancelCommandResult
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactlyInAnyOrder(WAN_COPY_REGION__MSG__EXECUTION__CANCELED,
+              WAN_COPY_REGION__MSG__EXECUTION__CANCELED,
+              WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+    } else {
+      verifyStatusIsOkInOneServer(cancelCommandResult);
+      String msg1 = CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+          regionName, senderIdInA);
+      cancelCommandResult
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactlyInAnyOrder(msg1, msg1, WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+    }
+
+    // Check wan-copy region command output
+    CommandResultAssert wanCopyCommandResult = wanCopyCommandFuture.get();
+    if (isPartitionedRegion && isParallelGatewaySender) {
+      verifyStatusIsError(wanCopyCommandResult);
+      String msg = WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+      wanCopyCommandResult
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactly(msg, msg, msg);
+    } else {
+      verifyStatusIsErrorInOneServer(wanCopyCommandResult);
+      String msg1 = CliStrings
+          .format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY, senderIdInA);
+      wanCopyCommandResult
+          .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+          .hasColumn("Message")
+          .containsExactlyInAnyOrder(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED, msg1,
+              msg1);
+    }
+    addIgnoredExceptionsForClosingAfterCancelCommand();
+  }
+
+  private int create3WanSitesAndClient(boolean isPartitionedRegion, VM locatorSender,
+      VM locatorSenderReceiver, VM locatorReceiver, List<VM> serversInA, VM serverInB,
+      VM serverInC, VM client, String senderIdInA, String senderIdInB) {
+    // Create locators
+    int receiverLocatorPort =
+        locatorReceiver.invoke(() -> createFirstLocatorWithDSId(3));
+    int senderReceiverLocatorPort = locatorSenderReceiver
+        .invoke(() -> createFirstRemoteLocator(2, receiverLocatorPort));
+    int senderLocatorPort = locatorSender.invoke(() -> {
+      Properties props = getDistributedSystemProperties();
+      props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + 1);
+      props.setProperty(REMOTE_LOCATORS, "localhost[" + senderReceiverLocatorPort + "]");
+      LocatorLauncherStartupRule launcherStartupRule =
+          new LocatorLauncherStartupRule().withProperties(props);
+      launcherStartupRule.start();
+      return launcherStartupRule.getLauncher().getPort();
+    });
+
+    // Create servers
+    Properties properties = new Properties();
+    serverInB.invoke(() -> createServer(senderReceiverLocatorPort, -1, properties));
+    serverInC.invoke(() -> createServer(receiverLocatorPort, -1, properties));
+    for (VM server : serversInA) {
+      server.invoke(() -> createServer(senderLocatorPort, -1, properties));
+    }
+
+    // Create region in servers
+    final String regionName = getRegionName(isPartitionedRegion);
+    if (isPartitionedRegion) {
+      for (VM server : serversInA) {
+        server
+            .invoke(() -> createPartitionedRegion(regionName, senderIdInA, 1, 100,
+                isOffHeap(), RegionShortcut.PARTITION, true));
+      }
+      serverInB.invoke(
+          () -> createPartitionedRegion(regionName, senderIdInB, 0, 100,
+              isOffHeap(), RegionShortcut.PARTITION, true));
+      serverInC.invoke(() -> createPartitionedRegion(regionName, null, 0, 100,
+          isOffHeap(), RegionShortcut.PARTITION, true));
+    } else {
+      for (VM server : serversInA) {
+        server.invoke(() -> createReplicatedRegion(regionName, senderIdInA,
+            Scope.GLOBAL, DataPolicy.REPLICATE,
+            isOffHeap(), true));
+      }
+      serverInB
+          .invoke(() -> createReplicatedRegion(regionName, senderIdInB,
+              Scope.GLOBAL, DataPolicy.REPLICATE,
+              isOffHeap(), true));
+      serverInC.invoke(() -> createReplicatedRegion(regionName, null,
+          Scope.GLOBAL, DataPolicy.REPLICATE, isOffHeap(), true));
+    }
+
+    // Create client
+    client.invoke(() -> createClientWithLocatorAndRegion(senderLocatorPort, "localhost",
+        regionName, ClientRegionShortcut.PROXY));
+
+    return senderLocatorPort;
+  }
+
+  private int create2WanSitesAndClient(VM locatorInA, List<VM> serversInA, String senderIdInA,
+      VM locatorInB, List<VM> serversInB, VM client, boolean usePartitionedRegion,
+      String regionName) {
+    // Create locators
+    int locatorBPort = locatorInB.invoke(() -> createFirstLocatorWithDSId(2));
+    int locatorAPort = locatorInA.invoke(() -> {
+      Properties props = getDistributedSystemProperties();
+      props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + 1);
+      props.setProperty(REMOTE_LOCATORS, "localhost[" + locatorBPort + "]");
+      LocatorLauncherStartupRule launcherStartupRule =
+          new LocatorLauncherStartupRule().withProperties(props);
+      launcherStartupRule.start();
+      return launcherStartupRule.getLauncher().getPort();
+    });
+
+    // Create servers and regions
+    createServersAndRegions(locatorBPort, serversInB, usePartitionedRegion, regionName, null);
+    createServersAndRegions(locatorAPort, serversInA, usePartitionedRegion, regionName,
+        senderIdInA);
+
+    // Create client
+    client.invoke(() -> createClientWithLocatorAndRegion(locatorAPort, "localhost",
+        regionName, ClientRegionShortcut.PROXY));
+
+    return locatorAPort;
+  }
+
+  private void createSenders(boolean isParallelGatewaySender, List<VM> serversInA,
+      VM serverInB, String senderIdInA, String senderIdInB) {
+    if (serverInB != null && senderIdInB != null) {
+      serverInB.invoke(() -> createSender(senderIdInB, 3,
+          isParallelGatewaySender, 100, 10, false,
+          false, null, false));
+    }
+    for (VM server : serversInA) {
+      server.invoke(() -> createSender(senderIdInA, 2, isParallelGatewaySender,
+          100, 10, false,
+          false, null, true));
+    }
+    startSenderInVMsAsync(senderIdInA, serversInA.toArray(new VM[0]));
+  }
+
+  private void createReceivers(VM serverInB, VM serverInC) {
+    createReceiverInVMs(serverInB);
+    createReceiverInVMs(serverInC);
+  }
+
+  private void stopReceiverAndVerifyResult(boolean useParallel, boolean stopPrimarySender,
+      int entries, String regionName, VM server1InB, VM server2InB, VM server3InB,
+      Future<CommandResultAssert> commandFuture)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    // if parallel sender: stop any receiver
+    // if serial sender: stop receiver connected to primary or secondary
+    if (useParallel) {
+      server2InB.invoke(() -> cache.close());
+    } else {
+      // Region type has no influence on which server should be stopped
+      if (stopPrimarySender) {
+        // Stop the first server which had an available receiver
+        server1InB.invoke(() -> cache.close());
+      } else {
+        server3InB.invoke(() -> cache.close());
+      }
+    }
+
+    CommandResultAssert result = commandFuture.get();
+    // Verify result
+    if (useParallel) {
+      verifyResultOfStoppingReceiverWhenUsingParallelSender(result);
+    } else {
+      verifyResultOfStoppingReceiverWhenUsingSerialSender(result);
+      server2InB.invoke(() -> validateRegionSize(regionName, entries));
+    }
+  }
+
+  private void stopSenderAndVerifyResult(boolean useParallel, boolean stopPrimarySender,
+      VM server2InA, List<VM> serversInA, String senderIdInA,
+      Future<CommandResultAssert> wanCopyCommandFuture)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    // If parallel: stop any server
+    // If serial: stop primary or secondary
+    if (useParallel) {
+      server2InA.invoke(() -> killSender(senderIdInA));
+    } else {
+      for (VM server : serversInA) {
+        boolean senderWasStopped = server.invoke(() -> {
+          GatewaySender sender = cache.getGatewaySender(senderIdInA);
+          if (((InternalGatewaySender) sender).isPrimary() == stopPrimarySender) {
+            killSender();
+            return true;
+          }
+          return false;
+        });
+        if (senderWasStopped) {
+          break;
+        }
+      }
+    }
+
+    CommandResultAssert result = wanCopyCommandFuture.get();
+    // Verify result
+    if (useParallel) {
+      verifyResultOfStoppingParallelSender(result);
+    } else {
+      if (stopPrimarySender) {
+        verifyResultOfStoppingPrimarySerialSender(result);
+      } else {
+        verifyResultStoppingSecondarySerialSender(result);
+      }
+    }
+  }
+
+  public void verifyResultOfStoppingReceiverWhenUsingSerialSender(
+      CommandResultAssert command) {
+    verifyStatusIsOk(command);
+    Condition<String> haveEntriesCopied =
+        new Condition<>(s -> s.startsWith("Entries copied: "), "Entries copied");
+    Condition<String> senderNotPrimary = new Condition<>(
+        s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+        "sender not primary");
+
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(1, haveEntriesCopied)
+        .haveExactly(2, senderNotPrimary);
+  }
+
+  public void verifyResultOfStoppingReceiverWhenUsingParallelSender(
+      CommandResultAssert command) {
+    verifyStatusIsOk(command);
+    Condition<String> haveEntriesCopied =
+        new Condition<>(s -> s.startsWith("Entries copied: "), "Entries copied");
+
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(3, haveEntriesCopied);
+  }
+
+  public void verifyResultOfStoppingParallelSender(CommandResultAssert command) {
+    verifyStatusIsErrorInOneServer(command);
+    Condition<String> startsWithError = new Condition<>(
+        s -> (s.startsWith("Execution failed. Error:")
+            || s.startsWith("Error (Unknown error sending batch)")
+            || s.startsWith("Error (Region destroyed)")
+            || s.startsWith("MemberResponse got memberDeparted event for")
+            || s.equals(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED)),
+        "execution error");
+    Condition<String> haveEntriesCopied =
+        new Condition<>(s -> s.startsWith("Entries copied:"), "Entries copied");
+
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(1, startsWithError)
+        .haveExactly(2, haveEntriesCopied);
+  }
+
+  public void verifyResultOfStoppingPrimarySerialSender(
+      CommandResultAssert command) {
+    verifyStatusIsErrorInOneServer(command);
+
+    Condition<String> startsWithError = new Condition<>(
+        s -> (s.startsWith("Execution failed. Error:")
+            || s.startsWith("Error (Unknown error sending batch)")
+            || s.startsWith("No connection available towards receiver after having copied")
+            || s.startsWith("Error (Region destroyed)")
+            || s.startsWith("MemberResponse got memberDeparted event for")
+            || s.equals(WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED)),
+        "execution error");
+
+    Condition<String> senderNotPrimary = new Condition<>(
+        s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+        "sender not primary");
+
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(1, startsWithError)
+        .haveExactly(2, senderNotPrimary);
+  }
+
+  public void verifyResultStoppingSecondarySerialSender(
+      CommandResultAssert command) {
+    command.statusIsSuccess();
+    command
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactlyInAnyOrder("OK", "OK", "OK");
+
+    Condition<String> haveEntriesCopied =
+        new Condition<>(s -> s.startsWith("Entries copied:"), "Entries copied");
+    Condition<String> senderNotPrimary = new Condition<>(
+        s -> s.equals("Sender B is serial and not primary. 0 entries copied."),
+        "sender not primary");
+
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .asList()
+        .haveExactly(1, haveEntriesCopied)
+        .haveExactly(2, senderNotPrimary);
+  }
+
+  private String getRegionName(boolean isPartitionedRegion) {
+    return getTestMethodName() + (isPartitionedRegion ? "_PR" : "RR");
+  }
+
+  public static void removeEntry(String regionName, long key) {
+    Region<?, ?> region = cache.getRegion(SEPARATOR + regionName);
+    assertNotNull(region);
+    region.remove(key);
+  }
+
+  public void sendRandomOpsFromClient(String regionName, Set<Long> keySet, int iterations) {
+    Region<Long, Integer> region = cache.getRegion(SEPARATOR + regionName);
+    assertNotNull(region);
+    int min = 0;
+    int max = 1000;
+    for (int i = 0; i < iterations; i++) {
+      for (Long key : keySet) {
+        long longKey = key;
+        int value = (int) (Math.random() * (max - min + 1) + min);
+        if (value < 50) {
+          region.remove(longKey);
+        } else {
+          region.put(longKey, value);
+        }
+      }
+    }
+  }
+
+  public CommandResultAssert verifyStatusIsOk(CommandResultAssert command) {
+    command.statusIsSuccess();
+    command
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactly("OK", "OK", "OK");
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(3);
+    return command;
+  }
+
+  public CommandResultAssert verifyStatusIsError(CommandResultAssert command) {
+    command.statusIsError();
+    command
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactly("ERROR", "ERROR", "ERROR");
+    return command;
+  }
+
+  public CommandResultAssert verifyStatusIsErrorInOneServer(
+      CommandResultAssert command) {
+    command.statusIsError();
+    command
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactlyInAnyOrder("OK", "OK", "ERROR");
+    return command;
+  }
+
+  public void verifyStatusIsOkInOneServer(
+      CommandResultAssert command) {
+    command.statusIsError();
+    command
+        .hasTableSection()
+        .hasColumns()
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Member")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Message")
+        .hasSize(3);
+    command
+        .hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
+        .hasColumn("Status")
+        .containsExactlyInAnyOrder("OK", "ERROR", "ERROR");
+  }
+
+  public void createServersAndRegions(int locatorPort, List<VM> servers,
+      boolean usePartitionedRegion, String regionName, String senderId) {
+
+    Properties properties = new Properties();
+    for (VM server : servers) {
+      server.invoke(() -> createServer(locatorPort, -1, properties));
+      if (usePartitionedRegion) {
+        server
+            .invoke(() -> createPartitionedRegion(regionName, senderId, 1, 100,
+                isOffHeap(), RegionShortcut.PARTITION, true));
+      } else {
+        server.invoke(() -> createReplicatedRegion(regionName, senderId,
+            Scope.GLOBAL, DataPolicy.REPLICATE,
+            isOffHeap(), true));
+      }
+    }
+  }
+
+  private void waitForWanCopyRegionCommandToStart(boolean useParallel, boolean usePartitionedRegion,
+      List<VM> servers) {
+    final int executionsExpected = useParallel && usePartitionedRegion ? servers.size() : 1;
+    await().untilAsserted(
+        () -> assertThat(getNumberOfCurrentExecutionsInServers(servers))
+            .isEqualTo(executionsExpected));
+  }
+
+  private void addIgnoredExceptionsForClosingAfterCancelCommand() {
+    addIgnoredException(
+        "Error closing the connection used to wan-copy region entries");
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+  }
+
+  private void addIgnoredExceptionsForSenderInUseWentDown() {
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+    addIgnoredException("DistributedSystemDisconnectedException");
+    addIgnoredException("org.apache.geode.distributed.PoolCancelledException");
+    addIgnoredException(
+        "Exception when running wan-copy region command: ");
+    addIgnoredException(
+        "Exception when running wan-copy region command: java.util.concurrent.ExecutionException: org.apache.geode.cache.EntryDestroyedException");
+    addIgnoredException(
+        "Error closing the connection used to wan-copy region entries");
+  }
+
+  private void addIgnoredExceptionsForReceiverConnectedToSenderInUseWentDown() {
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException in sendBatch. Retrying");
+    addIgnoredException(
+        "Exception org.apache.geode.cache.client.ServerConnectivityException in sendBatch. Retrying");
+    addIgnoredException("DistributedSystemDisconnectedException");
+  }
+
+  private int getNumberOfCurrentExecutionsInServers(List<VM> vmList) {
+    return vmList.stream()
+        .map((vm) -> vm.invoke(() -> ((InternalCache) cache)
+            .getService(WanCopyRegionFunctionService.class).getNumberOfCurrentExecutions()))
+        .reduce(0, Integer::sum);
+  }
+}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 9c4c6a6..7c9bc37 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -107,6 +107,9 @@ import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.ConnectionStats;
@@ -429,8 +432,7 @@ public class WANTestBase extends DistributedTestCase {
       }
 
       fact.setOffHeap(offHeap);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -438,7 +440,6 @@ public class WANTestBase extends DistributedTestCase {
     }
   }
 
-
   public static void createReplicatedProxyRegion(String regionName, String senderIds,
       Boolean offHeap) {
     IgnoredException exp =
@@ -458,8 +459,7 @@ public class WANTestBase extends DistributedTestCase {
       }
 
       fact.setOffHeap(offHeap);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -477,8 +477,7 @@ public class WANTestBase extends DistributedTestCase {
       }
     }
 
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    fact.create(regionName);
   }
 
   public static void createPersistentReplicatedRegion(String regionName, String senderIds,
@@ -493,8 +492,7 @@ public class WANTestBase extends DistributedTestCase {
     }
 
     fact.setOffHeap(offHeap);
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    fact.create(regionName);
   }
 
   public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
@@ -512,8 +510,7 @@ public class WANTestBase extends DistributedTestCase {
       }
 
       fact.setOffHeap(offHeap);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp1.remove();
     }
@@ -535,8 +532,7 @@ public class WANTestBase extends DistributedTestCase {
 
       fact.setOffHeap(offHeap);
       fact.addAsyncEventQueueId(asyncChannelId);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
     }
@@ -544,6 +540,11 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
       DataPolicy policy, Boolean offHeap) {
+    createReplicatedRegion(regionName, senderIds, scope, policy, offHeap, false);
+  }
+
+  public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
+      DataPolicy policy, Boolean offHeap, boolean statisticsEnabled) {
     RegionFactory fact = cache.createRegionFactory();
     if (senderIds != null) {
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
@@ -556,8 +557,8 @@ public class WANTestBase extends DistributedTestCase {
     fact.setDataPolicy(policy);
     fact.setScope(scope);
     fact.setOffHeap(offHeap);
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    fact.setStatisticsEnabled(statisticsEnabled);
+    fact.create(regionName);
   }
 
   public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
@@ -597,6 +598,13 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void createPartitionedRegion(String regionName, String senderIds,
       Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut) {
+    createPartitionedRegion(regionName, senderIds, redundantCopies, totalNumBuckets, offHeap,
+        shortcut, false);
+  }
+
+  public static void createPartitionedRegion(String regionName, String senderIds,
+      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut,
+      boolean statisticsEnabled) {
     IgnoredException exp =
         IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
@@ -616,8 +624,8 @@ public class WANTestBase extends DistributedTestCase {
       pfact.setRecoveryDelay(0);
       fact.setPartitionAttributes(pfact.create());
       fact.setOffHeap(offHeap);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.setStatisticsEnabled(statisticsEnabled);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -645,8 +653,7 @@ public class WANTestBase extends DistributedTestCase {
       pfact.setRedundantCopies(redundantCopies);
       pfact.setRecoveryDelay(0);
       fact.setPartitionAttributes(pfact.create());
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -675,8 +682,7 @@ public class WANTestBase extends DistributedTestCase {
       pfact.setRecoveryDelay(0);
       pfact.setColocatedWith(colocatedWith);
       fact.setPartitionAttributes(pfact.create());
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -684,15 +690,13 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void addSenderThroughAttributesMutator(String regionName, String senderIds) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     AttributesMutator mutator = r.getAttributesMutator();
     mutator.addGatewaySenderId(senderIds);
   }
 
   public static void addAsyncEventQueueThroughAttributesMutator(String regionName, String queueId) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     AttributesMutator mutator = r.getAttributesMutator();
     mutator.addAsyncEventQueueId(queueId);
   }
@@ -711,8 +715,7 @@ public class WANTestBase extends DistributedTestCase {
     pfact.setTotalNumBuckets(totalNumBuckets);
     pfact.setRedundantCopies(redundantCopies);
     fact.setPartitionAttributes(pfact.create());
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    fact.create(regionName);
   }
 
   public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName,
@@ -736,8 +739,7 @@ public class WANTestBase extends DistributedTestCase {
     pfact.setColocatedWith(colocatedWith);
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    fact.create(regionName);
   }
 
   public static void createPersistentPartitionedRegion(String regionName, String senderIds,
@@ -762,8 +764,7 @@ public class WANTestBase extends DistributedTestCase {
       pfact.setRedundantCopies(redundantCopies);
       fact.setPartitionAttributes(pfact.create());
       fact.setOffHeap(offHeap);
-      Region r = fact.create(regionName);
-      assertNotNull(r);
+      fact.create(regionName);
     } finally {
       exp.remove();
       exp1.remove();
@@ -791,7 +792,6 @@ public class WANTestBase extends DistributedTestCase {
       fact.setOffHeap(offHeap);
       customerRegion =
           (PartitionedRegion) fact.create(customerRegionName);
-      assertNotNull(customerRegion);
       logger.info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString());
 
       paf = new PartitionAttributesFactory();
@@ -810,7 +810,6 @@ public class WANTestBase extends DistributedTestCase {
       fact.setOffHeap(offHeap);
       orderRegion =
           (PartitionedRegion) fact.create(orderRegionName);
-      assertNotNull(orderRegion);
       logger.info("Partitioned Region ORDER created Successfully :" + orderRegion.toString());
 
       paf = new PartitionAttributesFactory();
@@ -829,7 +828,6 @@ public class WANTestBase extends DistributedTestCase {
       fact.setOffHeap(offHeap);
       shipmentRegion =
           (PartitionedRegion) fact.create(shipmentRegionName);
-      assertNotNull(shipmentRegion);
       logger.info("Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString());
     } finally {
       exp.remove();
@@ -851,17 +849,14 @@ public class WANTestBase extends DistributedTestCase {
     pfact.setRedundantCopies(redundantCopies);
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    Region<?, ?> r = fact.create(regionName);
 
     pfact.setColocatedWith(r.getName());
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
-    Region r1 = fact.create(regionName + "_child1");
-    assertNotNull(r1);
+    fact.create(regionName + "_child1");
 
-    Region r2 = fact.create(regionName + "_child2");
-    assertNotNull(r2);
+    fact.create(regionName + "_child2");
   }
 
   public static void createColocatedPartitionedRegions2(String regionName, String senderIds,
@@ -879,18 +874,15 @@ public class WANTestBase extends DistributedTestCase {
     pfact.setRedundantCopies(redundantCopies);
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
-    Region r = fact.create(regionName);
-    assertNotNull(r);
+    Region<?, ?> r = fact.create(regionName);
 
     fact = cache.createRegionFactory(RegionShortcut.PARTITION);
     pfact.setColocatedWith(r.getName());
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
-    Region r1 = fact.create(regionName + "_child1");
-    assertNotNull(r1);
+    fact.create(regionName + "_child1");
 
-    Region r2 = fact.create(regionName + "_child2");
-    assertNotNull(r2);
+    fact.create(regionName + "_child2");
   }
 
   public static void createCacheInVMs(Integer locatorPort, VM... vms) {
@@ -1033,7 +1025,6 @@ public class WANTestBase extends DistributedTestCase {
    */
   public static Integer createCacheServer() {
     CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
     server1.setPort(0);
     try {
       server1.start();
@@ -1192,7 +1183,6 @@ public class WANTestBase extends DistributedTestCase {
 
     BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
         .send((InternalDistributedMember) destination, region, bucketId, true);
-    assertNotNull(response);
     assertTrue(response.waitForResponse());
   }
 
@@ -1214,7 +1204,6 @@ public class WANTestBase extends DistributedTestCase {
   public static void checkConnectionStats(String senderId) {
     AbstractGatewaySender sender =
         (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
-    assertNotNull(sender);
 
     Collection statsCollection = sender.getProxy().getEndpointManager().getAllStats().values();
     assertTrue(statsCollection.iterator().next() instanceof ConnectionStats);
@@ -1258,7 +1247,6 @@ public class WANTestBase extends DistributedTestCase {
   public static int getGatewaySenderPoolDisconnects(String senderId) {
     AbstractGatewaySender sender =
         (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
-    assertNotNull(sender);
 
     PoolStats poolStats = sender.getProxy().getStats();
 
@@ -1320,6 +1308,23 @@ public class WANTestBase extends DistributedTestCase {
     assertEquals(creates, gatewayReceiverStats.getCreateRequest());
   }
 
+  public static List<Long> getReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+    assertTrue(stats instanceof GatewayReceiverStats);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
+    List<Long> statsList = new ArrayList<>();
+    statsList.add(gatewayReceiverStats.getEventsReceived());
+    statsList.add(gatewayReceiverStats.getEventsRetried());
+    statsList.add(gatewayReceiverStats.getProcessBatchRequests());
+    statsList.add(gatewayReceiverStats.getDuplicateBatchesReceived());
+    statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
+    statsList.add(gatewayReceiverStats.getEarlyAcks());
+    statsList.add(gatewayReceiverStats.getExceptionsOccurred());
+    return statsList;
+  }
+
   public static void checkMinimumGatewayReceiverStats(int processBatches, int eventsReceived) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
     GatewayReceiver receiver = gatewayReceivers.iterator().next();
@@ -1584,7 +1589,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   private void addCacheListenerOnRegion(String regionName) {
-    Region region = cache.getRegion(regionName);
+    Region<?, ?> region = cache.getRegion(regionName);
     AttributesMutator mutator = region.getAttributesMutator();
     listener1 = new QueueListener();
     mutator.addCacheListener(listener1);
@@ -2163,7 +2168,12 @@ public class WANTestBase extends DistributedTestCase {
 
   public static int createServer(int locPort, int maximumTimeBetweenPings) {
     WANTestBase test = new WANTestBase();
-    Properties props = test.getDistributedSystemProperties();
+    Properties properties = test.getDistributedSystemProperties();
+    return createServer(locPort, maximumTimeBetweenPings, properties);
+  }
+
+  public static int createServer(int locPort, int maximumTimeBetweenPings, Properties props) {
+    WANTestBase test = new WANTestBase();
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
     InternalDistributedSystem ds = test.getSystem(props);
@@ -2184,19 +2194,26 @@ public class WANTestBase extends DistributedTestCase {
     return port;
   }
 
-  public static void createClientWithLocator(int port0, String host, String regionName) {
-    createClientWithLocator(port0, host);
+  public static void createClientWithLocatorAndRegion(int port0, String host, String regionName,
+      ClientRegionShortcut regionType) {
+    cache = (Cache) new ClientCacheFactory().addPoolLocator(host, port0).create();
+
+    ((ClientCache) cache).createClientRegionFactory(regionType)
+        .create(regionName);
+  }
+
+  public static void createClientWithLocatorAndRegion(int port0, String host, String regionName) {
+    createClientWithLocatorAndRegion(port0, host);
 
     RegionFactory factory = cache.createRegionFactory(RegionShortcut.LOCAL);
     factory.setPoolName("pool");
 
     region = factory.create(regionName);
     region.registerInterest("ALL_KEYS");
-    assertNotNull(region);
     logger.info("Distributed Region " + regionName + " created Successfully :" + region.toString());
   }
 
-  public static void createClientWithLocator(final int port0, final String host) {
+  public static void createClientWithLocatorAndRegion(final int port0, final String host) {
     WANTestBase test = new WANTestBase();
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(MCAST_PORT, "0");
@@ -2258,8 +2275,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp2 =
         IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 1; i <= numPuts; i++) {
         txMgr.begin();
         r.put(i, i);
@@ -2278,8 +2294,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp2 =
         IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
         r.put(i, value);
       }
@@ -2295,8 +2310,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp2 =
         IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
         r.put(i, "Value_" + i);
       }
@@ -2312,7 +2326,6 @@ public class WANTestBase extends DistributedTestCase {
         IgnoredException ignored1 =
             IgnoredException.addIgnoredException(GatewaySenderException.class)) {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
       for (long i = 0; i < numPuts; i++) {
         cache.getCacheTransactionManager().begin();
         r.put(i, "Value_" + i);
@@ -2327,8 +2340,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp2 =
         IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
         r.put(key, "Value_" + i);
       }
@@ -2340,32 +2352,28 @@ public class WANTestBase extends DistributedTestCase {
 
 
   public static void doPutsAfter300(String regionName, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (long i = 300; i < numPuts; i++) {
       r.put(i, "Value_" + i);
     }
   }
 
   public static void doPutsFrom(String regionName, int from, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (long i = from; i < numPuts; i++) {
       r.put(i, "Value_" + i);
     }
   }
 
   public static void doDestroys(String regionName, int keyNum) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (long i = 0; i < keyNum; i++) {
       r.destroy(i);
     }
   }
 
   public static void doPutAll(String regionName, int numPuts, int size) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     for (long i = 0; i < numPuts; i++) {
       Map putAllMap = new HashMap();
       for (long j = 0; j < size; j++) {
@@ -2378,16 +2386,14 @@ public class WANTestBase extends DistributedTestCase {
 
 
   public static void doPutsWithKeyAsString(String regionName, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<String, String> r = cache.getRegion(SEPARATOR + regionName);
     for (long i = 0; i < numPuts; i++) {
       r.put("Object_" + i, "Value_" + i);
     }
   }
 
   public static void putGivenKeyValue(String regionName, Map keyValues) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (Object key : keyValues.keySet()) {
       r.put(key, keyValues.get(key));
     }
@@ -2397,8 +2403,6 @@ public class WANTestBase extends DistributedTestCase {
       int eventsPerTransaction) {
     Region orderRegion = cache.getRegion(orderRegionName);
     Region shipmentRegion = cache.getRegion(shipmentRegionName);
-    assertNotNull(orderRegion);
-    assertNotNull(shipmentRegion);
     int eventInTransaction = 0;
     CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
     for (Object key : keyValues.keySet()) {
@@ -2424,8 +2428,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doPutsInsideTransactions(String regionName, Map keyValues,
       int eventsPerTransaction) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);
     int eventInTransaction = 0;
     CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
     for (Object key : keyValues.keySet()) {
@@ -2448,8 +2451,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void destroyRegion(String regionName, final int min) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     await().until(() -> r.size() > min);
     r.destroyRegion();
   }
@@ -2458,8 +2460,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp =
         IgnoredException.addIgnoredException(PRLocallyDestroyedException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
       r.localDestroyRegion();
     } finally {
       exp.remove();
@@ -2478,8 +2479,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   protected static Map putCustomerPartitionedRegion(int numPuts, String valueSuffix) {
-    assertNotNull(cache);
-    assertNotNull(customerRegion);
     Map custKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2500,8 +2499,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map putOrderPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(orderRegion);
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2525,8 +2522,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map putOrderPartitionedRegionUsingCustId(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(orderRegion);
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2548,8 +2543,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map updateOrderPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(orderRegion);
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2573,8 +2566,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map updateOrderPartitionedRegionUsingCustId(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(orderRegion);
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2595,8 +2586,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map putShipmentPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(shipmentRegion);
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2621,10 +2610,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void putcolocatedPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(customerRegion);
-    assertNotNull(orderRegion);
-    assertNotNull(shipmentRegion);
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
       Customer customer = new Customer("Customer" + custid, "Address" + custid);
@@ -2641,8 +2626,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map putShipmentPartitionedRegionUsingCustId(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(shipmentRegion);
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2663,8 +2646,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map updateShipmentPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(shipmentRegion);
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2689,8 +2670,6 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Map updateShipmentPartitionedRegionUsingCustId(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(shipmentRegion);
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
@@ -2711,16 +2690,14 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void doPutsPDXSerializable(String regionName, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (int i = 0; i < numPuts; i++) {
       r.put("Key_" + i, new SimpleClass(i, (byte) i));
     }
   }
 
   public static void doPutsPDXSerializable2(String regionName, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     for (int i = 0; i < numPuts; i++) {
       r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i, "" + i, "" + i, i, i));
     }
@@ -2728,8 +2705,7 @@ public class WANTestBase extends DistributedTestCase {
 
 
   public static void doTxPuts(String regionName) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     CacheTransactionManager mgr = cache.getCacheTransactionManager();
 
     mgr.begin();
@@ -2741,8 +2717,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
       final long transactions, long offset) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);
 
     long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
     long j = 0;
@@ -2786,8 +2761,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp =
         IgnoredException.addIgnoredException(CacheClosedException.class.getName());
     try {
-      Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = start; i < numPuts; i++) {
         r.put(i, i);
       }
@@ -2889,8 +2863,7 @@ public class WANTestBase extends DistributedTestCase {
       }
     });
 
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
 
     List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
     for (long i = 0; i < 5; i++) {
@@ -2916,8 +2889,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp1 =
         IgnoredException.addIgnoredException(CacheClosedException.class.getName());
     try {
-      final Region r = cache.getRegion(SEPARATOR + regionName);
-      assertNotNull(r);
+      final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
       if (regionSize != r.keySet().size()) {
         await()
             .untilAsserted(() -> assertEquals(
@@ -2931,6 +2903,35 @@ public class WANTestBase extends DistributedTestCase {
     }
   }
 
+  public List<Object> getKeys(String regionName) {
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+    return new ArrayList<>(r.keySet());
+  }
+
+  public void checkEqualRegionData(String regionName, VM vm1, VM vm2) {
+    assertThat(vm1.invoke(() -> getRegionSize(regionName)))
+        .isEqualTo(vm2.invoke(() -> getRegionSize(regionName)));
+    for (Object key : vm1.invoke(() -> getKeys(regionName))) {
+      assertThat(vm1.invoke(() -> getValueForEntry((long) key, regionName)))
+          .isEqualTo(vm2.invoke(() -> getValueForEntry((long) key, regionName)));
+      assertThat(vm1.invoke(() -> getTimestampForEntry((long) key, regionName)))
+          .isEqualTo(vm2.invoke(() -> getTimestampForEntry((long) key, regionName)));
+    }
+  }
+
+  public static Object getValueForEntry(long key, String regionName) {
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+    return r.get(key);
+  }
+
+  public static long getTimestampForEntry(long key, String regionName) {
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
+    if (r.getEntry(key) == null) {
+      return 0;
+    }
+    return r.getEntry(key).getStatistics().getLastModifiedTime();
+  }
+
   public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
     AsyncEventListener theListener = null;
 
@@ -2942,7 +2943,6 @@ public class WANTestBase extends DistributedTestCase {
     }
 
     final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
-    assertNotNull(eventsMap);
     await()
         .untilAsserted(() -> assertEquals(
             "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size(),
@@ -2993,14 +2993,12 @@ public class WANTestBase extends DistributedTestCase {
     }
 
     final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
-    assertNotNull(eventsMap);
     logger.info("The events map size is " + eventsMap.size());
     return eventsMap.size();
   }
 
   public static void validateRegionSize_PDX(String regionName, final int regionSize) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     await().untilAsserted(() -> {
       assertEquals("Expected region entries: " + regionSize + " but actual entries: "
           + r.keySet().size() + " present region keyset " + r.keySet(), true,
@@ -3018,8 +3016,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void validateRegionSizeOnly_PDX(String regionName, final int regionSize) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     await()
         .untilAsserted(
             () -> assertEquals(
@@ -3053,8 +3050,7 @@ public class WANTestBase extends DistributedTestCase {
    *
    */
   public static void validateRegionSizeRemainsSame(String regionName, final int regionSizeLimit) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     WaitCriterion wc = new WaitCriterion() {
       final int MIN_VERIFICATION_RUNS = 20;
       int sameRegionSizeCounter = 0;
@@ -3091,20 +3087,17 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static String getRegionFullPath(String regionName) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     return r.getFullPath();
   }
 
   public static Integer getRegionSize(String regionName) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     return r.keySet().size();
   }
 
   public static void validateRegionContents(String regionName, final Map keyValues) {
-    final Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
     await().untilAsserted(() -> {
       boolean matchFlag = true;
       for (Object key : keyValues.keySet()) {
@@ -3121,8 +3114,7 @@ public class WANTestBase extends DistributedTestCase {
 
 
   public static void doHeavyPuts(String regionName, int numPuts) {
-    Region r = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(r);
+    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
     // GatewaySender.DEFAULT_BATCH_SIZE * OBJECT_SIZE should be more than MAXIMUM_QUEUE_MEMORY
     // to guarantee overflow
     for (long i = 0; i < numPuts; i++) {
@@ -3131,8 +3123,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void addCacheListenerAndDestroyRegion(String regionName) {
-    final Region region = cache.getRegion(SEPARATOR + regionName);
-    assertNotNull(region);
+    final Region<?, ?> region = cache.getRegion(SEPARATOR + regionName);
     CacheListenerAdapter cl = new CacheListenerAdapter() {
       @Override
       public void afterCreate(EntryEvent event) {
@@ -3619,7 +3610,6 @@ public class WANTestBase extends DistributedTestCase {
     AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     PoolImpl pool = sender.getProxy();
     if (poolShouldExist) {
-      assertNotNull(pool);
       assertEquals(expectedPoolLocatorsSize, pool.getLocators().size());
     } else {
       assertNull(pool);
@@ -3627,8 +3617,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void removeSenderFromTheRegion(String senderId, String regionName) {
-    Region region = cache.getRegion(regionName);
-    assertNotNull(region);
+    Region<?, ?> region = cache.getRegion(regionName);
     region.getAttributesMutator().removeGatewaySenderId(senderId);
   }
 
@@ -3658,7 +3647,6 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void destroyAsyncEventQueue(String id) {
     AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id);
-    assertNotNull(aeq);
     aeq.destroy();
   }
 
@@ -4174,7 +4162,6 @@ public class WANTestBase extends DistributedTestCase {
       public void run() {
         ManagementService service = ManagementService.getManagementService(cache);
         GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
-        assertNotNull(bean);
         bean.start();
         assertTrue(bean.isRunning());
       }
@@ -4194,7 +4181,6 @@ public class WANTestBase extends DistributedTestCase {
       public void run() {
         ManagementService service = ManagementService.getManagementService(cache);
         GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
-        assertNotNull(bean);
         bean.stop();
         assertFalse(bean.isRunning());
       }
@@ -4270,10 +4256,8 @@ public class WANTestBase extends DistributedTestCase {
         });
         ManagementService service = ManagementService.getManagementService(cache);
         final DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
-        assertNotNull(dsBean);
         Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus();
         logger.info("Ds Map is: " + dsMap.size());
-        assertNotNull(dsMap);
         assertEquals(true, dsMap.size() > 0);
       }
     };
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
index 33bd871..8259a12 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -38,7 +38,7 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
         isOffHeap()));
 
-    vm4.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost",
+    vm4.invoke(() -> WANTestBase.createClientWithLocatorAndRegion(nyPort, "localhost",
         getTestMethodName() + "_PR"));
     vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100));
 
@@ -51,7 +51,7 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
         isOffHeap()));
 
-    vm7.invoke(() -> WANTestBase.createClientWithLocator(lnPort, "localhost",
+    vm7.invoke(() -> WANTestBase.createClientWithLocatorAndRegion(lnPort, "localhost",
         getTestMethodName() + "_PR"));
 
     startSenderInVMsAsync("ln", vm5, vm6);
diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java
new file mode 100644
index 0000000..80c00a2
--- /dev/null
+++ b/geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCommandAutoCompletionIntegrationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.GfshTest;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+import org.apache.geode.test.junit.rules.GfshParserRule.CommandCandidate;
+
+@Category(GfshTest.class)
+public class WanCommandAutoCompletionIntegrationTest {
+
+  @Rule
+  public GfshParserRule gfshParserRule = new GfshParserRule();
+
+  @Test
+  public void testCompletionOffersMandatoryOptionsInAlphabeticalOrderForWanCopyRegionWithSpace() {
+    String buffer = "wan-copy region ";
+    CommandCandidate candidate = gfshParserRule.complete(buffer);
+    assertThat(candidate.getCandidates()).hasSize(2);
+    assertThat(candidate.getFirstCandidate()).isEqualTo(buffer + "--region");
+    assertThat(candidate.getCandidate(1)).isEqualTo(buffer + "--sender-id");
+  }
+
+  @Test
+  public void testCompletionOffersTheFirstMandatoryOptionInAlphabeticalOrderForWanCopyRegionWithDash() {
+    String buffer = "wan-copy region --";
+    CommandCandidate candidate = gfshParserRule.complete(buffer);
+    assertThat(candidate.getCandidates()).hasSize(1);
+    assertThat(candidate.getFirstCandidate()).isEqualTo(buffer + "region");
+  }
+
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
index 4e5cf4b..bbeadda 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderEventRemoteDispatcher.java
@@ -32,8 +32,11 @@ import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ExecutablePool;
 import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.client.locator.GatewaySenderBatchOp;
 import org.apache.geode.cache.wan.internal.client.locator.SenderProxy;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
@@ -978,4 +981,21 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
           || t instanceof GemFireSecurityException;
     }
   }
+
+  @Override
+  public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
+      ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException)
+      throws BatchException70 {
+    GatewaySenderBatchOp.executeOn(connection, senderPool, events, batchId,
+        removeFromQueueOnException, false);
+    GatewaySenderEventRemoteDispatcher.GatewayAck ack =
+        (GatewaySenderEventRemoteDispatcher.GatewayAck) GatewaySenderBatchOp.executeOn(connection,
+            senderPool);
+    if (ack == null) {
+      throw new BatchException70("Unknown error sending batch", null, 0, batchId);
+    }
+    if (ack.getBatchException() != null) {
+      throw ack.getBatchException();
+    }
+  }
 }
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
new file mode 100644
index 0000000..79000b4
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionService implements CacheService {
+
+  private volatile ExecutorService wanCopyRegionFunctionExecutionPool;
+
+  /**
+   * Contains the ongoing executions of WanCopyRegionFunction
+   */
+  private final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  @Override
+  public boolean init(Cache cache) {
+    String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+        "WAN Copy Region Function Execution Processor";
+    int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+    wanCopyRegionFunctionExecutionPool = LoggingExecutors
+        .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+            WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
+    return true;
+  }
+
+  @Override
+  public Class<? extends CacheService> getInterface() {
+    return WanCopyRegionFunctionService.class;
+  }
+
+  @Override
+  public CacheServiceMBeanBase getMBean() {
+    return null;
+  }
+
+  @Override
+  public void close() {
+    wanCopyRegionFunctionExecutionPool.shutdownNow();
+    try {
+      if (!wanCopyRegionFunctionExecutionPool.awaitTermination(5, TimeUnit.SECONDS)) {
+        wanCopyRegionFunctionExecutionPool.shutdownNow();
+      }
+    } catch (InterruptedException ie) {
+      wanCopyRegionFunctionExecutionPool.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public CliFunctionResult execute(Callable<CliFunctionResult> callable,
+      String regionName, String senderId) throws InterruptedException, ExecutionException,
+      WanCopyRegionFunctionServiceAlreadyRunningException {
+    String executionName = getExecutionName(regionName, senderId);
+    Future<CliFunctionResult> future = null;
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          throw new WanCopyRegionFunctionServiceAlreadyRunningException(
+              "There is already an execution running for " + regionName + " and " + senderId);
+        }
+        future = wanCopyRegionFunctionExecutionPool.submit(callable);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  public boolean cancel(String regionName, String senderId) {
+    Future<CliFunctionResult> execution = executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return false;
+    }
+    execution.cancel(true);
+    return true;
+  }
+
+  public String cancelAll() {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return executionsString;
+  }
+
+  public int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }
+
+  private String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
similarity index 68%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
copy to geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
index 86828ae..3796ab4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException.java
@@ -12,23 +12,13 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.wan;
+package org.apache.geode.cache.wan.internal;
 
-import java.util.List;
+import java.io.Serializable;
 
-/**
- * @since GemFire 7.0
- *
- */
-public interface GatewaySenderEventDispatcher {
-
-  boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry);
-
-  boolean isRemoteDispatcher();
-
-  boolean isConnectedToRemote();
-
-  void stop();
-
-  void shutDownAckReaderConnection();
+public class WanCopyRegionFunctionServiceAlreadyRunningException extends Exception
+    implements Serializable {
+  public WanCopyRegionFunctionServiceAlreadyRunningException(String message) {
+    super(message);
+  }
 }
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java
new file mode 100644
index 0000000..bcc2bd1
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
+import org.apache.geode.management.internal.cli.functions.WanCopyRegionFunction;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.security.ResourcePermission.Operation;
+import org.apache.geode.security.ResourcePermission.Resource;
+
+public class WanCopyRegionCommand extends GfshCommand {
+  private final WanCopyRegionFunction wanCopyRegionFunction = new WanCopyRegionFunction();
+
+  /* 'wan-copy region' command */
+  public static final String WAN_COPY_REGION = "wan-copy region";
+  public static final String WAN_COPY_REGION__HELP =
+      "Copy a region with a senderId via WAN replication";
+  public static final String WAN_COPY_REGION__REGION = "region";
+  public static final String WAN_COPY_REGION__REGION__HELP =
+      "Region from which data will be exported.";
+  public static final String WAN_COPY_REGION__SENDERID = "sender-id";
+  public static final String WAN_COPY_REGION__SENDERID__HELP =
+      "Sender Id to use to copy the region.";
+  public static final String WAN_COPY_REGION__MAXRATE = "max-rate";
+  public static final String WAN_COPY_REGION__MAXRATE__HELP =
+      "Maximum rate for copying in entries per second.";
+  public static final String WAN_COPY_REGION__BATCHSIZE = "batch-size";
+  public static final String WAN_COPY_REGION__BATCHSIZE__HELP =
+      "Number of entries to be copied in each batch.";
+  public static final String WAN_COPY_REGION__CANCEL = "cancel";
+  public static final String WAN_COPY_REGION__CANCEL__HELP =
+      "Cancel an ongoing wan-copy region command";
+  public static final String WAN_COPY_REGION__MSG__REGION__NOT__FOUND = "Region {0} not found";
+  public static final String WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER =
+      "Region {0} is not configured to use sender {1}";
+  public static final String WAN_COPY_REGION__MSG__SENDER__NOT__FOUND = "Sender {0} not found";
+  public static final String WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY =
+      "Sender {0} is serial and not primary. 0 entries copied.";
+  public static final String WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING =
+      "Sender {0} is not running";
+  public static final String WAN_COPY_REGION__MSG__EXECUTION__CANCELED = "Execution canceled";
+  public static final String WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED =
+      "Executions canceled: {0}";
+  public static final String WAN_COPY_REGION__MSG__EXECUTION__FAILED =
+      "Execution failed. Error: {0}";
+  public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL =
+      "No connection pool available to receiver";
+  public static final String WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE =
+      "Command not supported at remote site.";
+  public static final String WAN_COPY_REGION__MSG__NO__CONNECTION =
+      "No connection available to receiver after having copied {0} entries";
+  public static final String WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED =
+      "Error ({0}) in operation after having copied {1} entries";
+  public static final String WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED =
+      "Operation canceled before having copied all entries";
+  public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries copied: {0}";
+  public static final String WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND =
+      "No running command to be canceled for region {0} and sender {1}";
+  public static final String WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND =
+      "There is already a command running for region {0} and sender {1}";
+
+  @CliAvailabilityIndicator({WAN_COPY_REGION})
+  public boolean commandAvailable() {
+    return isOnlineCommandAvailable();
+  }
+
+  @CliCommand(value = WAN_COPY_REGION, help = WAN_COPY_REGION__HELP)
+  @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+  public ResultModel wanCopyRegion(
+      @CliOption(key = WAN_COPY_REGION__REGION, mandatory = true,
+          optionContext = ConverterHint.REGION_PATH,
+          help = WAN_COPY_REGION__REGION__HELP) String regionName,
+      @CliOption(key = WAN_COPY_REGION__SENDERID, mandatory = true,
+          optionContext = ConverterHint.GATEWAY_SENDER_ID,
+          help = WAN_COPY_REGION__SENDERID__HELP) String senderId,
+      @CliOption(key = WAN_COPY_REGION__MAXRATE,
+          unspecifiedDefaultValue = "0",
+          help = WAN_COPY_REGION__MAXRATE__HELP) long maxRate,
+      @CliOption(key = WAN_COPY_REGION__BATCHSIZE,
+          unspecifiedDefaultValue = "1000",
+          help = WAN_COPY_REGION__BATCHSIZE__HELP) int batchSize,
+      @CliOption(key = WAN_COPY_REGION__CANCEL,
+          unspecifiedDefaultValue = "false",
+          specifiedDefaultValue = "true",
+          help = WAN_COPY_REGION__CANCEL__HELP) boolean isCancel) {
+
+    authorize(Resource.DATA, Operation.WRITE, regionName);
+    final Object[] args = {regionName, senderId, isCancel, maxRate, batchSize};
+    ResultCollector<?, ?> resultCollector =
+        executeFunction(wanCopyRegionFunction, args, getAllNormalMembers());
+    final List<CliFunctionResult> cliFunctionResults =
+        getCliFunctionResults((List<CliFunctionResult>) resultCollector.getResult());
+    return ResultModel.createMemberStatusResult(cliFunctionResults, false, false);
+  }
+
+  private List<CliFunctionResult> getCliFunctionResults(List<CliFunctionResult> resultsObjects) {
+    final List<CliFunctionResult> cliFunctionResults = new ArrayList<>();
+    for (Object result : resultsObjects) {
+      if (result instanceof FunctionInvocationTargetException) {
+        CliFunctionResult errorResult =
+            new CliFunctionResult(
+                ((FunctionInvocationTargetException) result).getMemberId().getName(),
+                CliFunctionResult.StatusState.ERROR,
+                ((FunctionInvocationTargetException) result).getMessage());
+        cliFunctionResults.add(errorResult);
+      } else {
+        cliFunctionResults.add((CliFunctionResult) result);
+      }
+    }
+    return cliFunctionResults;
+  }
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
index 57e9468..e0cdb23 100755
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/client/locator/GatewaySenderBatchOp.java
@@ -95,10 +95,12 @@ public class GatewaySenderBatchOp {
           byte posDupByte = (byte) (event.getPossibleDuplicate() ? 0x01 : 0x00);
           getMessage().addBytesPart(new byte[] {posDupByte});
         }
-        if (action >= 0 && action <= 3) {
+        if (action >= 0 && action <= GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
           // 0 = create
           // 1 = update
           // 2 = destroy
+          // 3 = update timestamp
+          // 4 = update passing generatecallbacks
           String regionName = event.getRegionPath();
           EventID eventId = event.getEventId();
           Object key = event.getKey();
@@ -110,7 +112,7 @@ public class GatewaySenderBatchOp {
           getMessage().addObjPart(eventId);
           // Add key
           getMessage().addStringOrObjPart(key);
-          if (action < 2 /* it is 0 or 1 */) {
+          if (action < 2 || action == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
             byte[] value = event.getSerializedValue();
             byte valueIsObject = event.getValueIsObject();;
             // Add value (which is already a serialized byte[])
diff --git a/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
new file mode 100644
index 0000000..d236ac9
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__CANCELED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__EXECUTION__FAILED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY;
+
+import java.io.Serializable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionServiceAlreadyRunningException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements Declarable {
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private final WanCopyRegionFunctionServiceProvider serviceProvider;
+
+  public WanCopyRegionFunction() {
+    this(new WanCopyRegionFunctionServiceProviderImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(WanCopyRegionFunctionServiceProvider serviceProvider) {
+    this.serviceProvider = serviceProvider;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__FOUND, regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, senderId));
+    }
+    if (!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+          CliStrings.format(WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    return executeFunctionInService(context, region, sender, maxRate, batchSize);
+  }
+
+  private CliFunctionResult executeFunctionInService(FunctionContext<Object[]> context,
+      Region<?, ?> region, GatewaySender sender, long maxRate, int batchSize) {
+    try {
+      return serviceProvider.get((InternalCache) context.getCache()).execute(
+          () -> new WanCopyRegionFunctionDelegate().wanCopyRegion(
+              (InternalCache) context.getCache(), context.getMemberName(), region, sender, maxRate,
+              batchSize),
+          region.getName(),
+          sender.getId());
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__EXECUTION__FAILED, e.getMessage()));
+    } catch (WanCopyRegionFunctionServiceAlreadyRunningException e) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND, region.getName(),
+              sender.getId()));
+    }
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> context,
+      String regionName, String senderId) {
+    boolean canceled =
+        serviceProvider.get((InternalCache) context.getCache()).cancel(regionName, senderId);
+    if (!canceled) {
+      return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+        WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> context) {
+    String executionsString = serviceProvider.get((InternalCache) context.getCache())
+        .cancelAll();
+    return new CliFunctionResult(context.getMemberName(), CliFunctionResult.StatusState.OK,
+        CliStrings.format(WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, executionsString));
+  }
+
+  @FunctionalInterface
+  interface WanCopyRegionFunctionServiceProvider extends Serializable {
+    WanCopyRegionFunctionService get(InternalCache cache);
+  }
+
+  static class WanCopyRegionFunctionServiceProviderImpl
+      implements WanCopyRegionFunctionServiceProvider {
+    @Override
+    public WanCopyRegionFunctionService get(InternalCache cache) {
+      return cache.getService(WanCopyRegionFunctionService.class);
+    }
+  }
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java
new file mode 100644
index 0000000..50c3838
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__CONNECTION;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+public class WanCopyRegionFunctionDelegate implements Serializable {
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private int batchId = 0;
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final EventCreator eventCreator;
+
+  private static final Logger logger = LogService.getLogger();
+
+  public WanCopyRegionFunctionDelegate() {
+    this(Clock.systemDefaultZone(), new ThreadSleeperImpl(), new EventCreatorImpl());
+  }
+
+  public WanCopyRegionFunctionDelegate(Clock clock, ThreadSleeper threadSleeper,
+      EventCreator eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.eventCreator = eventCreator;
+  }
+
+  public CliFunctionResult wanCopyRegion(InternalCache cache, String memberName,
+      Region<?, ?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(memberName, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(memberName, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(memberName, CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              WanCopyRegionCommand.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(memberName, CliFunctionResult.StatusState.OK,
+        CliStrings.format(WanCopyRegionCommand.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(String memberName,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(memberName,
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(memberName, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+    if (maxRate == 0) {
+      return 0;
+    }
+    final long elapsedMs = clock.millis() - startTime;
+    if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= maxRate) {
+      return 0;
+    }
+    final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+    return targetElapsedMs - elapsedMs;
+  }
+
+
+  static class ConnectionState {
+    private volatile Connection connection = null;
+    private volatile PoolImpl senderPool = null;
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PoolImpl getSenderPool() {
+      return senderPool;
+    }
+
+    public Optional<CliFunctionResult> connectIfNeeded(String memberName,
+        GatewaySender sender) {
+      if (senderPool == null) {
+        senderPool = ((AbstractGatewaySender) sender).getProxy();
+        if (senderPool == null) {
+          return Optional.of(new CliFunctionResult(memberName,
+              CliFunctionResult.StatusState.ERROR,
+              WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+        }
+        connection = senderPool.acquireConnection();
+        if (connection.getWanSiteVersion() < KnownVersion.GEODE_1_15_0.ordinal()) {
+          return Optional.of(new CliFunctionResult(memberName,
+              CliFunctionResult.StatusState.ERROR,
+              WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+        }
+      }
+      return Optional.empty();
+    }
+
+    public Optional<CliFunctionResult> reconnect(String memberName, int retries,
+        int copiedEntries, Exception e) {
+      close();
+      if (retries >= MAX_BATCH_SEND_RETRIES) {
+        return Optional.of(new CliFunctionResult(memberName,
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                "Connection error", copiedEntries)));
+      }
+      logger.error("Exception {} in sendBatch. Retrying", e.getClass().getName());
+      try {
+        connection = senderPool.acquireConnection();
+      } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+        return Optional.of(new CliFunctionResult(memberName,
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                WAN_COPY_REGION__MSG__NO__CONNECTION,
+                copiedEntries)));
+      }
+      return Optional.empty();
+    }
+
+    public void close() {
+      if (senderPool != null && connection != null) {
+        try {
+          connection.close(false);
+        } catch (Exception e) {
+          logger.error("Error closing the connection used to wan-copy region entries");
+        }
+        senderPool.returnConnection(connection);
+      }
+      connection = null;
+    }
+  }
+
+
+  @FunctionalInterface
+  interface ThreadSleeper extends Serializable {
+    void sleep(long millis) throws InterruptedException;
+  }
+
+  static class ThreadSleeperImpl implements ThreadSleeper {
+    @Override
+    public void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  }
+
+
+  @FunctionalInterface
+  interface EventCreator extends Serializable {
+    GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
+  }
+
+  static class EventCreatorImpl implements EventCreator {
+    @VisibleForTesting
+    public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry) {
+      final EntryEventImpl event;
+      if (region instanceof PartitionedRegion) {
+        event = createEventForPartitionedRegion(sender, cache, region, entry);
+      } else {
+        event = createEventForReplicatedRegion(cache, region, entry);
+      }
+      if (event == null) {
+        return null;
+      }
+      try {
+        return new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS,
+            event, null, TransactionMetadataDisposition.EXCLUDE);
+      } catch (IOException e) {
+        logger.error("Error when creating event in wan-copy: {}", e.getMessage());
+        return null;
+      }
+    }
+
+    private EntryEventImpl createEventForReplicatedRegion(InternalCache cache,
+        InternalRegion region,
+        Region.Entry<?, ?> entry) {
+      return createEvent(cache, region, entry);
+    }
+
+    private EntryEventImpl createEventForPartitionedRegion(GatewaySender sender,
+        InternalCache cache,
+        InternalRegion region,
+        Region.Entry<?, ?> entry) {
+      EntryEventImpl event = createEvent(cache, region, entry);
+      if (event == null) {
+        return null;
+      }
+      BucketRegion bucketRegion = ((PartitionedRegion) event.getRegion()).getDataStore()
+          .getLocalBucketById(event.getKeyInfo().getBucketId());
+      if (bucketRegion != null && !bucketRegion.getBucketAdvisor().isPrimary()
+          && sender.isParallel()) {
+        return null;
+      }
+      if (bucketRegion != null) {
+        bucketRegion.handleWANEvent(event);
+      }
+      return event;
+    }
+
+    private EntryEventImpl createEvent(InternalCache cache, InternalRegion region,
+        Region.Entry<?, ?> entry) {
+      EntryEventImpl event;
+      try {
+        event = new DefaultEntryEventFactory().create(region, Operation.UPDATE,
+            entry.getKey(),
+            entry.getValue(), null, false,
+            cache.getInternalDistributedSystem().getDistributedMember(), false);
+      } catch (EntryDestroyedException e) {
+        return null;
+      }
+      if (entry instanceof NonTXEntry) {
+        event.setVersionTag(((NonTXEntry) entry).getRegionEntry().getVersionStamp().asVersionTag());
+      } else {
+        event.setVersionTag(((EntrySnapshot) entry).getVersionTag());
+      }
+      event.setNewEventId(cache.getInternalDistributedSystem());
+      return event;
+    }
+  }
+}
diff --git a/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService b/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
new file mode 100644
index 0000000..30999e3
--- /dev/null
+++ b/geode-wan/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
@@ -0,0 +1 @@
+org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService
diff --git a/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker b/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker
new file mode 100644
index 0000000..182564b
--- /dev/null
+++ b/geode-wan/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker
@@ -0,0 +1 @@
+org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand
diff --git a/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt b/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
index e69de29..a93f5db 100755
--- a/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
+++ b/geode-wan/src/main/resources/org/apache/geode/cache/wan/internal/sanctioned-geode-wan-serializables.txt
@@ -0,0 +1,6 @@
+org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceAlreadyRunningException,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction,true,1,serviceProvider:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction$WanCopyRegionFunctionServiceProvider
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction$WanCopyRegionFunctionServiceProviderImpl,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate,false,batchId:int,clock:java/time/Clock,eventCreator:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreator,threadSleeper:org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeper
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$EventCreatorImpl,false
+org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegate$ThreadSleeperImpl,false
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
new file mode 100644
index 0000000..864aa63
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public class WanCopyRegionFunctionServiceTest {
+
+  private WanCopyRegionFunctionService service;
+  private final InternalCache cache = mock(InternalCache.class);
+
+  @Before
+  public void setUp() throws Exception {
+    service = new WanCopyRegionFunctionService();
+    service.init(cache);
+  }
+
+  @Test
+  public void severalExecuteWithSameRegionAndSenderNotAllowed() {
+    CountDownLatch latch = new CountDownLatch(1);
+    Callable<CliFunctionResult> firstExecution = () -> {
+      latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+      return null;
+    };
+
+    String regionName = "myRegion";
+    String senderId = "mySender";
+    CompletableFuture
+        .supplyAsync(() -> {
+          try {
+            return service.execute(firstExecution, regionName, senderId);
+          } catch (Exception e) {
+            return null;
+          }
+        });
+
+    // Wait for the execute function to start
+    await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(1));
+
+    // Execute another function instance for the same region and sender-id
+    Callable<CliFunctionResult> secondExecution = () -> null;
+
+    assertThatThrownBy(() -> service.execute(secondExecution, regionName, senderId))
+        .isInstanceOf(WanCopyRegionFunctionServiceAlreadyRunningException.class);
+
+    // Let first execution finish
+    latch.countDown();
+  }
+
+  @Test
+  public void cancelRunningExecutionReturnsSuccess() {
+    String regionName = "myRegion";
+    String senderId = "mySender";
+    CountDownLatch latch = new CountDownLatch(1);
+    Callable<CliFunctionResult> firstExecution = () -> {
+      latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+      return null;
+    };
+    CompletableFuture
+        .supplyAsync(() -> {
+          try {
+            return service.execute(firstExecution, regionName, senderId);
+          } catch (Exception e) {
+            return null;
+          }
+        });
+
+    // Wait for the function to start execution
+    await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(1));
+
+    // Cancel the function execution
+    boolean result = service.cancel(regionName, senderId);
+
+    assertThat(result).isEqualTo(true);
+    await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+  }
+
+  @Test
+  public void cancelNotRunningExecutionReturnsError() {
+    final String regionName = "myRegion";
+    final String senderId = "mySender";
+
+    boolean result = service.cancel(regionName, senderId);
+
+    assertThat(result).isEqualTo(false);
+  }
+
+  @Test
+  public void cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() {
+    CountDownLatch latch = new CountDownLatch(2);
+    Callable<CliFunctionResult> firstExecution = () -> {
+      latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+      return null;
+    };
+
+    CompletableFuture
+        .supplyAsync(() -> {
+          try {
+            return service.execute(firstExecution, "myRegion", "mySender1");
+          } catch (Exception e) {
+            return null;
+          }
+        });
+
+    Callable<CliFunctionResult> secondExecution = () -> {
+      latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+      return null;
+    };
+
+    CompletableFuture
+        .supplyAsync(() -> {
+          try {
+            return service.execute(secondExecution, "myRegion", "mySender");
+          } catch (Exception e) {
+            return null;
+          }
+        });
+
+    // Wait for the functions to start execution
+    await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2));
+
+    // Cancel the function execution
+    String executionsString = service.cancelAll();
+
+    assertThat(executionsString).isEqualTo("[(myRegion,mySender1), (myRegion,mySender)]");
+    await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+  }
+
+  @Test
+  public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
+    int executions = 5;
+    CountDownLatch latch = new CountDownLatch(executions);
+    for (int i = 0; i < executions; i++) {
+      Callable<CliFunctionResult> firstExecution = () -> {
+        latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+        return null;
+      };
+
+      final String regionName = String.valueOf(i);
+      CompletableFuture
+          .supplyAsync(() -> {
+            try {
+              return service.execute(firstExecution, regionName, "mySender1");
+            } catch (Exception e) {
+              return null;
+            }
+          });
+    }
+
+    // Wait for the functions to start execution
+    await().untilAsserted(
+        () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
+
+    // End executions
+    for (int i = 0; i < executions; i++) {
+      latch.countDown();
+    }
+  }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java
new file mode 100644
index 0000000..9028a47
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__BATCHSIZE;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__CANCEL;
+import static org.apache.geode.cache.wan.internal.cli.commands.WanCopyRegionCommand.WAN_COPY_REGION__MAXRATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.management.internal.cli.GfshParseResult;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+public class WanCopyRegionCommandTest {
+  @ClassRule
+  public static GfshParserRule gfsh = new GfshParserRule();
+
+  @Test
+  public void gfshParserReturnsNullIfMandatoryOptionsNotSpecified() {
+    assertThat(gfsh.parse("wan-copy region")).isNull();
+    assertThat(gfsh.parse("wan-copy region --region=myregion")).isNull();
+    assertThat(gfsh.parse("wan-copy region --sender-id=ln")).isNull();
+    assertThat(gfsh.parse("wan-copy region --batch-size=10")).isNull();
+  }
+
+  @Test
+  public void verifyDefaultValues() {
+    GfshParseResult result = gfsh.parse("wan-copy region --region=myregion --sender-id=ln");
+    assertThat(result.getParamValueAsString(WAN_COPY_REGION__MAXRATE)).isEqualTo("0");
+    assertThat(result.getParamValueAsString(WAN_COPY_REGION__BATCHSIZE))
+        .isEqualTo("1000");
+    assertThat(result.getParamValueAsString(WAN_COPY_REGION__CANCEL))
+        .isEqualTo("false");
+  }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java
new file mode 100644
index 0000000..34cebd4
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionDelegateTest.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.management.internal.cli.functions.WanCopyRegionFunctionDelegate.EventCreator;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionDelegateTest {
+
+  private WanCopyRegionFunctionDelegate function;
+  private long startTime;
+  private final int entries = 25;
+  private Clock clockMock;
+  private WanCopyRegionFunctionDelegate.ThreadSleeper threadSleeperMock;
+  private InternalCache internalCacheMock;
+  private GatewaySender gatewaySenderMock;
+  private PoolImpl poolMock;
+  private Connection connectionMock;
+  private GatewaySenderEventDispatcher dispatcherMock;
+
+  private final Region<?, ?> regionMock =
+      uncheckedCast(mock(InternalRegion.class));
+
+  private EventCreator eventCreatorMock;
+
+  @Before
+  public void setUp() throws InterruptedException {
+    startTime = System.currentTimeMillis();
+    clockMock = mock(Clock.class);
+    threadSleeperMock = mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class);
+    gatewaySenderMock = mock(AbstractGatewaySender.class);
+    poolMock = mock(PoolImpl.class);
+    connectionMock = mock(PooledConnection.class);
+    dispatcherMock = mock(GatewaySenderEventDispatcher.class);
+    eventCreatorMock = mock(EventCreator.class);
+    AbstractGatewaySenderEventProcessor eventProcessorMock =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+    internalCacheMock = mock(InternalCache.class);
+
+    function = new WanCopyRegionFunctionDelegate(clockMock, threadSleeperMock, eventCreatorMock);
+
+    doNothing().when(threadSleeperMock).sleep(anyLong());
+    when(gatewaySenderMock.getId()).thenReturn("mySender");
+    when(connectionMock.getWanSiteVersion()).thenReturn(KnownVersion.GEODE_1_15_0.ordinal());
+    when(eventCreatorMock.createGatewaySenderEvent(any(), any(), any(), any()))
+        .thenReturn(uncheckedCast(mock(GatewayQueueEvent.class)));
+    when(eventProcessorMock.getDispatcher()).thenReturn(dispatcherMock);
+    when(((InternalGatewaySender) gatewaySenderMock).getEventProcessor())
+        .thenReturn(eventProcessorMock);
+    when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(Collections.emptySet()));
+    when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+    when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+  }
+
+  @Test
+  public void doPostSendBatchActions_DoNotSleepIfMaxRateIsZero()
+      throws InterruptedException {
+    // arrange
+    when(internalCacheMock.getRegion(any())).thenReturn(null);
+
+    // act
+    function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 0L, 10);
+
+    // assert
+    verify(threadSleeperMock, never()).sleep(anyLong());
+  }
+
+  @Test
+  public void doPostSendBatchActions_SleepIfMaxRateIsNotZero()
+      throws InterruptedException, BatchException70 {
+    // arrange
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 2};
+    WanCopyRegionFunctionDelegate.ThreadSleeper sleeperMock =
+        mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class);
+    doNothing().when(sleeperMock).sleep(anyLong());
+
+    // act
+    executeWanCopyRegionFunction(options, sleeperMock);
+
+    // assert
+    ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
+    verify(sleeperMock, times(1)).sleep(captor.capture());
+    assertThat(captor.getValue()).isGreaterThan(0).isLessThanOrEqualTo(2000);
+  }
+
+  @Test
+  public void doPostSendBatchActions_ThrowInterruptedIfInterruptedTimeToSleepNotZero()
+      throws InterruptedException {
+    // arrange
+    long maxRate = 1;
+    long elapsedTime = 20L;
+    InterruptedException thrownByThreadSleeper = new InterruptedException("thrownByThreadSleeper");
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    doThrow(thrownByThreadSleeper).when(threadSleeperMock).sleep(anyLong());
+
+    // act
+    Throwable thrown =
+        catchThrowable(() -> function.doPostSendBatchActions(startTime, entries, maxRate));
+
+    // assert
+    assertThat(thrown).isInstanceOf(InterruptedException.class);
+  }
+
+  @Test
+  public void doPostSendBatchActions_ThrowInterruptedIfInterruptedTimeToSleepIsZero() {
+    long maxRate = 0;
+
+    // act
+    Thread.currentThread().interrupt();
+    Throwable thrown =
+        catchThrowable(() -> function.doPostSendBatchActions(startTime, entries, maxRate));
+
+    // assert
+    assertThat(thrown).isInstanceOf(InterruptedException.class);
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenRemoteSiteDoesNotSupportCommand()
+      throws InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    PooledConnection oldWanSiteConn = mock(PooledConnection.class);
+    when(oldWanSiteConn.getWanSiteVersion()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    when(poolMock.acquireConnection()).thenReturn(oldWanSiteConn);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Command not supported at remote site.");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenNoPoolAvailableAndEntriesInRegion()
+      throws InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(null);
+    when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+        .thenThrow(NoAvailableServersException.class);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("No connection pool available to receiver");
+  }
+
+  @Test
+  public void wanCopyRegion_verifySuccessWhenNoPoolAvailableAndNoEntriesInRegion()
+      throws InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(null);
+    when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+        .thenThrow(NoAvailableServersException.class);
+    when(regionMock.entrySet()).thenReturn(new HashSet<>());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Entries copied: 0");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenNoConnectionAvailableAtStartAndEntriesInRegion() {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+        .thenThrow(NoAvailableServersException.class);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+    // act
+    Throwable thrown = catchThrowable(() -> function.wanCopyRegion(internalCacheMock, "member1",
+        regionMock, gatewaySenderMock, 1, 10));
+
+    // assert
+    assertThat(thrown).isInstanceOf(NoAvailableServersException.class);
+  }
+
+  @Test
+  public void wanCopyRegion_verifySuccessWhenNoConnectionAvailableAtStartAndNoEntriesInRegion()
+      throws InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenThrow(NoAvailableServersException.class)
+        .thenThrow(NoAvailableServersException.class);
+    when(regionMock.entrySet()).thenReturn(new HashSet<>());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Entries copied: 0");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenNoConnectionAvailableAfterCopyingSomeEntries()
+      throws BatchException70, InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock)
+        .thenThrow(NoAvailableServersException.class);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doNothing().doThrow(ConnectionDestroyedException.class).doNothing().when(dispatcherMock)
+        .sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 1);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("No connection available to receiver after having copied 1 entries");
+  }
+
+  @Test
+  public void wanCopyRegion_verifySuccess() throws BatchException70, InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Entries copied: 1");
+  }
+
+  @Test
+  public void wanCopyRegion_verifySuccessWithRetryWhenConnectionDestroyed()
+      throws BatchException70, InterruptedException {
+    // arrange
+    ConnectionDestroyedException exceptionWhenSendingBatch =
+        new ConnectionDestroyedException("My connection exception", new Exception());
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(exceptionWhenSendingBatch).doNothing().when(dispatcherMock).sendBatch(anyList(), any(),
+        any(), anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Entries copied: 1");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenConnectionDestroyedTwice()
+      throws BatchException70, InterruptedException {
+    // arrange
+    ConnectionDestroyedException exceptionWhenSendingBatch =
+        new ConnectionDestroyedException("My connection exception", new Exception());
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+        anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Error (Connection error) in operation after having copied 0 entries");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenRegionDestroyed()
+      throws BatchException70, InterruptedException {
+    // arrange
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+    doReturn(true).when(regionMock).isDestroyed();
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Error (Region destroyed) in operation after having copied 1 entries");
+  }
+
+  @Test
+  public void wanCopyRegion_verifySuccessWithRetryWhenServerConnectivityException()
+      throws BatchException70, InterruptedException {
+    // arrange
+    ServerConnectivityException exceptionWhenSendingBatch =
+        new ServerConnectivityException("My connection exception", new Exception());
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(exceptionWhenSendingBatch).doNothing().when(dispatcherMock).sendBatch(anyList(), any(),
+        any(), anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Entries copied: 1");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenServerConnectivityExceptionTwice()
+      throws BatchException70, InterruptedException {
+    // arrange
+    ServerConnectivityException exceptionWhenSendingBatch =
+        new ServerConnectivityException("My connection exception", new Exception());
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+        anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Error (Connection error) in operation after having copied 0 entries");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyErrorWhenBatchExceptionWhileSendingBatch()
+      throws BatchException70, InterruptedException {
+    // arrange
+    BatchException70 exceptionWhenSendingBatch =
+        new BatchException70("My batch exception", new Exception("test exception"), 0, 0);
+    BatchException70 topLevelException =
+        new BatchException70(Collections.singletonList(exceptionWhenSendingBatch));
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(topLevelException).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+        anyInt(), anyBoolean());
+
+    // act
+    CliFunctionResult result =
+        function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
+
+    // assert
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo(
+            "Error (java.lang.Exception: test exception) in operation after having copied 0 entries");
+  }
+
+  @Test
+  public void wanCopyRegion_verifyExceptionThrownWhenExceptionWhileSendingBatch()
+      throws BatchException70 {
+    // arrange
+    RuntimeException exceptionWhenSendingBatch =
+        new RuntimeException("Exception when sending batch");
+    when(((AbstractGatewaySender) gatewaySenderMock).getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock);
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+    doThrow(exceptionWhenSendingBatch).when(dispatcherMock).sendBatch(anyList(), any(), any(),
+        anyInt(), anyBoolean());
+
+    // act
+    Throwable thrown = catchThrowable(
+        () -> function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1,
+            10));
+
+    // assert
+    assertThat(thrown).isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenMaxRateIsZero() {
+    assertThat(function.getTimeToSleep(startTime, 1, 0)).isEqualTo(0);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenCopiedEntriesIsZero() {
+    assertThat(function.getTimeToSleep(startTime, 0, 1)).isEqualTo(0);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenBelowMaxRate() {
+    long elapsedTime = 2000L;
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    assertThat(function.getTimeToSleep(startTime, 1, 1)).isEqualTo(0);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenOnMaxRate() {
+    long elapsedTime = 1000L;
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    assertThat(function.getTimeToSleep(startTime, 1, 1)).isEqualTo(0);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value1() {
+    long elapsedTime = 1000L;
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    assertThat(function.getTimeToSleep(startTime, 2, 1)).isEqualTo(1000);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value2() {
+    long elapsedTime = 1000L;
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    assertThat(function.getTimeToSleep(startTime, 4, 1)).isEqualTo(3000);
+  }
+
+  @Test
+  public void getTimeToSleep_ReturnZeroWhenAboveMaxRate_value3() {
+    long elapsedTime = 2000L;
+    when(clockMock.millis()).thenReturn(startTime + elapsedTime);
+    assertThat(function.getTimeToSleep(startTime, 4, 1)).isEqualTo(2000);
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunction(Object[] options,
+      WanCopyRegionFunctionDelegate.ThreadSleeper sleeper)
+      throws BatchException70, InterruptedException {
+    Region<?, ?> regionMock =
+        uncheckedCast(mock(InternalRegion.class));
+
+    RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+    Set<?> idsMock = mock(Set.class);
+    when(idsMock.contains(anyString())).thenReturn(true);
+    when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(idsMock));
+    when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+
+    Set<Region.Entry<Object, Object>> entries = new HashSet<>();
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    entries.add(uncheckedCast(mock(Region.Entry.class)));
+    when(regionMock.entrySet()).thenReturn(uncheckedCast(entries));
+
+    doNothing().when(dispatcherMock).sendBatch(anyList(), any(), any(), anyInt(), anyBoolean());
+
+    AbstractGatewaySender gatewaySenderMock = mock(AbstractGatewaySender.class);
+    when(gatewaySenderMock.getId()).thenReturn((String) options[1]);
+    when(gatewaySenderMock.isRunning()).thenReturn(true);
+    when(gatewaySenderMock.isParallel()).thenReturn(true);
+    when(gatewaySenderMock.isPrimary()).thenReturn(false);
+
+    when(gatewaySenderMock.getProxy()).thenReturn(poolMock);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock).thenReturn(connectionMock);
+
+    AbstractGatewaySenderEventProcessor eventProcessorMock =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    when(eventProcessorMock.getDispatcher()).thenReturn(dispatcherMock);
+    when(gatewaySenderMock.getEventProcessor()).thenReturn(eventProcessorMock);
+
+    InternalCache internalCacheMock = mock(InternalCache.class);
+    when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+    when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+
+    WanCopyRegionFunctionDelegate function =
+        new WanCopyRegionFunctionDelegate(Clock.systemDefaultZone(),
+            sleeper, eventCreatorMock);
+
+    return function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 10,
+        10);
+  }
+}
diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java
new file mode 100644
index 0000000..1e278d2
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionTest {
+
+  private WanCopyRegionFunction function;
+  private InternalCache internalCacheMock;
+  private GatewaySender gatewaySenderMock;
+
+  private final FunctionContext<Object[]> contextMock = uncheckedCast(mock(FunctionContext.class));
+
+  private final Region<?, ?> regionMock =
+      uncheckedCast(mock(InternalRegion.class));
+
+
+  @Before
+  public void setUp() throws InterruptedException {
+    gatewaySenderMock = mock(AbstractGatewaySender.class);
+    when(gatewaySenderMock.getId()).thenReturn("mySender");
+
+    function = new WanCopyRegionFunction();
+
+    RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class);
+    Set<?> idsMock = mock(Set.class);
+    when(idsMock.contains(anyString())).thenReturn(true);
+    when(attributesMock.getGatewaySenderIds()).thenReturn(uncheckedCast(idsMock));
+    when(regionMock.getAttributes()).thenReturn(uncheckedCast(attributesMock));
+
+    internalCacheMock = mock(InternalCache.class);
+    when(internalCacheMock.getRegion(any())).thenReturn(uncheckedCast(regionMock));
+  }
+
+  @Test
+  public void executeFunction_verifyErrorWhenRegionNotFound() {
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+    when(internalCacheMock.getRegion(any())).thenReturn(null);
+    when(contextMock.getArguments()).thenReturn(options);
+    when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+    CliFunctionResult result = function.executeFunction(contextMock);
+
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage()).isEqualTo("Region myRegion not found");
+  }
+
+  @Test
+  public void executeFunction_verifyErrorWhenSenderNotFound() {
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+    when(internalCacheMock.getGatewaySender(any())).thenReturn(null);
+    when(contextMock.getArguments()).thenReturn(options);
+    when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+    CliFunctionResult result = function.executeFunction(contextMock);
+
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage()).isEqualTo("Sender mySender not found");
+  }
+
+  @Test
+  public void executeFunction_verifyErrorWhenSenderIsNotRunning() {
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+    when(gatewaySenderMock.isRunning()).thenReturn(false);
+    when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+    when(contextMock.getArguments()).thenReturn(options);
+    when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+    CliFunctionResult result = function.executeFunction(contextMock);
+
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage()).isEqualTo("Sender mySender is not running");
+  }
+
+  @Test
+  public void executeFunction_verifySuccessWhenSenderIsSerialAndSenderIsNotPrimary() {
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+    when(gatewaySenderMock.isRunning()).thenReturn(true);
+    when(gatewaySenderMock.isParallel()).thenReturn(false);
+    when(((InternalGatewaySender) gatewaySenderMock).isPrimary()).thenReturn(false);
+    when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+    when(contextMock.getArguments()).thenReturn(options);
+    when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+    CliFunctionResult result = function.executeFunction(contextMock);
+
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.OK.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Sender mySender is serial and not primary. 0 entries copied.");
+  }
+
+  @Test
+  public void executeFunction_verifyErrorWhenSenderNotConfiguredWithForRegion() {
+    Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 10};
+    Set<String> senders = new HashSet<>();
+    senders.add("notMySender");
+    when(gatewaySenderMock.isParallel()).thenReturn(true);
+    when(regionMock.getAttributes().getGatewaySenderIds()).thenReturn(senders);
+    when(internalCacheMock.getGatewaySender(any())).thenReturn(gatewaySenderMock);
+    when(contextMock.getArguments()).thenReturn(options);
+    when(contextMock.getCache()).thenReturn(internalCacheMock);
+
+    CliFunctionResult result = function.executeFunction(contextMock);
+    assertThat(result.getStatus()).isEqualTo(CliFunctionResult.StatusState.ERROR.toString());
+    assertThat(result.getStatusMessage())
+        .isEqualTo("Region myRegion is not configured to use sender mySender");
+  }
+}
diff --git a/geode-wan/src/test/resources/expected-pom.xml b/geode-wan/src/test/resources/expected-pom.xml
index 50f45c8..52547b2 100644
--- a/geode-wan/src/test/resources/expected-pom.xml
+++ b/geode-wan/src/test/resources/expected-pom.xml
@@ -72,6 +72,11 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.geode</groupId>
+      <artifactId>geode-gfsh</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <scope>runtime</scope>