You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2018/03/19 20:01:45 UTC

[geode] branch develop updated: GEODE-4815: Forced locator members to be LOCATOR_DM_TYPES

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 752bc23  GEODE-4815: Forced locator members to be LOCATOR_DM_TYPES
752bc23 is described below

commit 752bc238c37003ebabe132af7598469cbbf07eb6
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Mon Mar 19 13:01:42 2018 -0700

    GEODE-4815: Forced locator members to be LOCATOR_DM_TYPES
---
 .../cache/wan/AsyncEventQueueTestBase.java         |  15 +-
 .../internal/cache/UpdateVersionDUnitTest.java     | 668 +++++++--------------
 .../cache/wan/CacheClientNotifierDUnitTest.java    |  64 +-
 .../cache/wan/Simple2CacheServerDUnitTest.java     |  41 +-
 .../geode/internal/cache/wan/WANTestBase.java      | 105 ++--
 5 files changed, 306 insertions(+), 587 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 7a956c8..8366ca7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -93,6 +93,7 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.cache.wan.GatewaySenderFactory;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -183,7 +184,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
     props.setProperty(LOCATORS, "localhost[" + port + "]");
     props.setProperty(START_LOCATOR,
         "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    test.startLocatorDistributedSystem(props);
     return port;
   }
 
@@ -197,10 +198,20 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
     props.setProperty(START_LOCATOR,
         "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
     props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocatorDistributedSystem(props);
     return port;
   }
 
+  private void startLocatorDistributedSystem(Properties props) {
+    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+    try {
+      getSystem(props);
+    } finally {
+      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+    }
+  }
+
   public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
       String asyncQueueIds, Boolean offHeap) {
     IgnoredException exp1 =
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
index 4e75db1..c6d97e3 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
@@ -25,13 +25,14 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.DiskStore;
@@ -49,6 +50,7 @@ import org.apache.geode.cache.wan.GatewayReceiverFactory;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.cache.wan.GatewaySenderFactory;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.LocalRegion.NonTXEntry;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
@@ -60,11 +62,7 @@ import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.WanTest;
@@ -88,532 +86,303 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
   @Test
   public void testUpdateVersionAfterCreateWithSerialSender() {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0); // server1 site1
+    VM vm0 = host.getVM(0); // locator site1
     VM vm1 = host.getVM(1); // server2 site1
 
-    VM vm2 = host.getVM(2); // server1 site2
-    VM vm3 = host.getVM(3); // server2 site2
+    VM vm2 = host.getVM(2); // locator site2
+    VM vm3 = host.getVM(3); // server1 site2
+    VM vm4 = host.getVM(4); // server2 site2
 
     final String key = "key-1";
 
     // Site 1
     Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
 
-    vm0.invoke(() -> this.createCache(lnPort));
-    vm0.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
+    vm1.invoke(() -> this.createCache(lnPort));
+    vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
 
-    vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
-    vm0.invoke(() -> this.startSender("ln1"));
-    vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm1.invoke(() -> this.startSender("ln1"));
+    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
 
     // Site 2
     Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
-    Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
 
-    vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
-    vm3.invoke(() -> this.createCache(nyPort));
     vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+    vm4.invoke(() -> this.createCache(nyPort));
+    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
 
-    final VersionTag tag =
-        (VersionTag) vm0.invoke("Update a single entry and get its version", () -> {
-          Cache cache = CacheFactory.getAnyInstance();
-          Region region = cache.getRegion(regionName);
-          assertTrue(region instanceof PartitionedRegion);
-
-          region.put(key, "value-1");
-          region.put(key, "value-2");
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof EntrySnapshot);
-          RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
-          VersionStamp stamp = regionEntry.getVersionStamp();
-
-          // Create a duplicate entry version tag from stamp with newer
-          // time-stamp.
-          VersionSource memberId =
-              (VersionSource) cache.getDistributedSystem().getDistributedMember();
-          VersionTag versionTag = VersionTag.create(memberId);
-
-          int entryVersion = stamp.getEntryVersion() - 1;
-          int dsid = stamp.getDistributedSystemId();
-
-          // Increment the time by 1 in case the time is the same as the previous event.
-          // The entry's version timestamp can be incremented by 1 in certain circumstances.
-          // See AbstractRegionEntry.generateVersionTag.
-          long time = System.currentTimeMillis() + 1;
-
-          versionTag.setEntryVersion(entryVersion);
-          versionTag.setDistributedSystemId(dsid);
-          versionTag.setVersionTimeStamp(time);
-          versionTag.setIsRemoteForTesting();
-
-          EntryEventImpl event =
-              createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
-
-          ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
-          // Verify the new stamp
-          entry = region.getEntry(key);
-          assertTrue(entry instanceof EntrySnapshot);
-          regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
-          stamp = regionEntry.getVersionStamp();
-          assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
-              time, stamp.getVersionTimeStamp());
-          assertEquals(++entryVersion, stamp.getEntryVersion());
-          assertEquals(dsid, stamp.getDistributedSystemId());
-
-          return stamp.asVersionTag();
-        });
-
-    VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
-      Cache cache = CacheFactory.getAnyInstance();
-      final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
-      // wait for entry to be received
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          Entry<?, ?> entry = null;
-          try {
-            entry = region.getDataStore().getEntryLocally(0, key, false, false);
-          } catch (EntryNotFoundException | ForceReattemptException e) {
-            // expected
-          } catch (PRLocallyDestroyedException e) {
-            throw new RuntimeException("unexpected exception", e);
-          }
-          if (entry != null) {
-            LogWriterUtils.getLogWriter().info("found entry " + entry);
-          }
-          return (entry != null);
-        }
-
-        public String description() {
-          return "Expected " + key + " to be received on remote WAN site";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
-
-      wc = new WaitCriterion() {
-        public boolean done() {
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof EntrySnapshot);
-          RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-          return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
-        }
-
-        public String description() {
-          return "waiting for timestamp to be updated";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
+    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
 
-      Entry entry = region.getEntry(key);
-      assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot);
-      RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
-      VersionStamp stamp = regionEntry.getVersionStamp();
-
-      return stamp.asVersionTag();
-    });
-
-    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
+    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
         remoteTag.getVersionTimeStamp());
   }
 
   @Test
   public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0); // server1 site1
-    VM vm1 = host.getVM(1); // server2 site1
+    VM vm0 = host.getVM(0); // locator site1
+    VM vm1 = host.getVM(1); // server1 site1
 
-    VM vm2 = host.getVM(2); // server1 site2
-    VM vm3 = host.getVM(3); // server2 site2
+    VM vm2 = host.getVM(2); // locator site2
+    VM vm3 = host.getVM(3); // server1 site2
+    VM vm4 = host.getVM(4); // server2 site2
 
     final String key = "key-1";
 
     // Site 1
     Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
 
-    vm0.invoke(() -> this.createCache(lnPort));
-    vm0.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
+    vm1.invoke(() -> this.createCache(lnPort));
+    vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
 
-    vm0.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
-    vm0.invoke(() -> this.startSender("ln1"));
-    vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+    vm1.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
+    vm1.invoke(() -> this.startSender("ln1"));
+    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
 
     // Site 2
     Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
-    Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
 
-    vm2.invoke(() -> this.createReplicatedRegion(regionName, ""));
-    vm3.invoke(() -> this.createCache(nyPort));
     vm3.invoke(() -> this.createReplicatedRegion(regionName, ""));
+    vm4.invoke(() -> this.createCache(nyPort));
+    vm4.invoke(() -> this.createReplicatedRegion(regionName, ""));
 
-    final VersionTag tag =
-        (VersionTag) vm0.invoke("Update a single entry and get its version", () -> {
-          Cache cache = CacheFactory.getAnyInstance();
-          Region region = cache.getRegion(regionName);
-          assertTrue(region instanceof DistributedRegion);
-
-          region.put(key, "value-1");
-          region.put(key, "value-2");
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof NonTXEntry);
-          RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
-          VersionStamp stamp = regionEntry.getVersionStamp();
-
-          // Create a duplicate entry version tag from stamp with newer
-          // time-stamp.
-          VersionSource memberId =
-              (VersionSource) cache.getDistributedSystem().getDistributedMember();
-          VersionTag versionTag = VersionTag.create(memberId);
-
-          int entryVersion = stamp.getEntryVersion() - 1;
-          int dsid = stamp.getDistributedSystemId();
-
-          // Increment the time by 1 in case the time is the same as the previous event.
-          // The entry's version timestamp can be incremented by 1 in certain circumstances.
-          // See AbstractRegionEntry.generateVersionTag.
-          long time = System.currentTimeMillis() + 1;
-
-          versionTag.setEntryVersion(entryVersion);
-          versionTag.setDistributedSystemId(dsid);
-          versionTag.setVersionTimeStamp(time);
-          versionTag.setIsRemoteForTesting();
-
-          EntryEventImpl event =
-              createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");
-
-          ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
-          // Verify the new stamp
-          entry = region.getEntry(key);
-          assertTrue(entry instanceof NonTXEntry);
-          regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
-          stamp = regionEntry.getVersionStamp();
-          assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
-              time, stamp.getVersionTimeStamp());
-          assertEquals(entryVersion + 1, stamp.getEntryVersion());
-          assertEquals(dsid, stamp.getDistributedSystemId());
-
-          return stamp.asVersionTag();
-        });
-
-    VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
-
-      Cache cache = CacheFactory.getAnyInstance();
-      final Region region = cache.getRegion(regionName);
-
-      // wait for entry to be received
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return (region.getEntry(key) != null);
-        }
-
-        public String description() {
-          return "Expected key-1 to be received on remote WAN site";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
-
-      wc = new WaitCriterion() {
-        public boolean done() {
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof NonTXEntry);
-          RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
-          return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
-        }
-
-        public String description() {
-          return "waiting for timestamp to be updated";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
-
-      Entry entry = region.getEntry(key);
-      assertTrue(entry instanceof NonTXEntry);
-      RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+    VersionTag localTag = vm1.invoke(() -> putEntryAndGetReplicatedRegionVersionTag(key));
+    VersionTag remoteTag = vm4.invoke(() -> getReplicatedRegionVersionTag(key, localTag));
 
-      VersionStamp stamp = regionEntry.getVersionStamp();
-
-      return stamp.asVersionTag();
-    });
-
-    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
+    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
         remoteTag.getVersionTimeStamp());
   }
 
   @Test
   public void testUpdateVersionAfterCreateWithParallelSender() {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0); // server1 site1
-    VM vm1 = host.getVM(1); // server2 site1
+    VM vm0 = host.getVM(0); // locator site1
+    VM vm1 = host.getVM(1); // server1 site1
 
-    VM vm2 = host.getVM(2); // server1 site2
-    VM vm3 = host.getVM(3); // server2 site2
+    VM vm2 = host.getVM(2); // locator site2
+    VM vm3 = host.getVM(3); // server1 site2
+    VM vm4 = host.getVM(4); // server2 site2
 
     // Site 1
     Integer lnPort = vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
 
     final String key = "key-1";
 
-    vm0.invoke(() -> this.createCache(lnPort));
-    vm0.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));
+    vm1.invoke(() -> this.createCache(lnPort));
+    vm1.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));
 
-    vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
-    vm0.invoke(() -> this.startSender("ln1"));
-    vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm1.invoke(() -> this.startSender("ln1"));
+    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
 
     // Site 2
     Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
-    Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
-
-    vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
 
-    vm3.invoke(() -> this.createCache(nyPort));
     vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+    vm4.invoke(() -> this.createCache(nyPort));
+    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
 
-    final VersionTag tag = vm0.invoke("Put a single entry and get its version", () -> {
-      Cache cache = CacheFactory.getAnyInstance();
-      Region region = cache.getRegion(regionName);
-      assertTrue(region instanceof PartitionedRegion);
+    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
 
-      region.put(key, "value-1");
-      region.put(key, "value-2");
-      Entry entry = region.getEntry(key);
-      assertTrue(entry instanceof EntrySnapshot);
-      RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
+        remoteTag.getVersionTimeStamp());
+  }
 
-      VersionStamp stamp = regionEntry.getVersionStamp();
+  @Test
+  public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // locator site1
+    VM vm1 = host.getVM(1); // server1 site1
 
-      // Create a duplicate entry version tag from stamp with newer
-      // time-stamp.
-      VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
-      VersionTag versionTag = VersionTag.create(memberId);
+    VM vm2 = host.getVM(2); // locator site2
+    VM vm3 = host.getVM(3); // server1 site2
+    VM vm4 = host.getVM(4); // server2 site2
 
-      int entryVersion = stamp.getEntryVersion() - 1;
-      int dsid = stamp.getDistributedSystemId();
+    // Site 1
+    Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
 
-      // Increment the time by 1 in case the time is the same as the previous event.
-      // The entry's version timestamp can be incremented by 1 in certain circumstances.
-      // See AbstractRegionEntry.generateVersionTag.
-      long time = System.currentTimeMillis() + 1;
+    final String key = "key-1";
 
-      versionTag.setEntryVersion(entryVersion);
-      versionTag.setDistributedSystemId(dsid);
-      versionTag.setVersionTimeStamp(time);
-      versionTag.setIsRemoteForTesting();
+    vm1.invoke(() -> this.createCache(lnPort));
+    vm1.invoke(
+        () -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));
 
-      EntryEventImpl event =
-          createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
+    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm1.invoke(() -> this.startSender("ln1"));
+    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
 
-      ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+    // Site 2
+    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
+    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
 
-      // Verify the new stamp
-      entry = region.getEntry(key);
-      assertTrue(entry instanceof EntrySnapshot);
-      regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+    vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+    vm4.invoke(() -> this.createCache(nyPort));
+    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
 
-      stamp = regionEntry.getVersionStamp();
-      assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
-          time, stamp.getVersionTimeStamp());
-      assertEquals(++entryVersion, stamp.getEntryVersion());
-      assertEquals(dsid, stamp.getDistributedSystemId());
+    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
 
-      return stamp.asVersionTag();
-    });
+    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
+        remoteTag.getVersionTimeStamp());
+  }
 
-    VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
-      Cache cache = CacheFactory.getAnyInstance();
-      final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
-      // wait for entry to be received
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          Entry<?, ?> entry = null;
-          try {
-            entry = region.getDataStore().getEntryLocally(0, key, false, false);
-          } catch (EntryNotFoundException e) {
-            // expected
-          } catch (ForceReattemptException e) {
-            // expected
-          } catch (PRLocallyDestroyedException e) {
-            throw new RuntimeException("unexpected exception", e);
-          }
-          if (entry != null) {
-            LogWriterUtils.getLogWriter().info("found entry " + entry);
-          }
-          return (entry != null);
-        }
-
-        public String description() {
-          return "Expected key-1 to be received on remote WAN site";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
-
-      wc = new WaitCriterion() {
-        public boolean done() {
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof EntrySnapshot);
-          RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-          return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
-        }
-
-        public String description() {
-          return "waiting for timestamp to be updated";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
+  private VersionTag putEntryAndGetReplicatedRegionVersionTag(String key) {
+    Region region = cache.getRegion(regionName);
+    assertTrue(region instanceof DistributedRegion);
 
-      Entry entry = region.getEntry(key);
-      assertTrue(entry instanceof EntrySnapshot);
-      RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+    region.put(key, "value-1");
+    region.put(key, "value-2");
+    Entry entry = region.getEntry(key);
+    assertTrue(entry instanceof NonTXEntry);
+    RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
 
-      VersionStamp stamp = regionEntry.getVersionStamp();
+    VersionStamp stamp = regionEntry.getVersionStamp();
 
-      return stamp.asVersionTag();
-    });
+    // Create a duplicate entry version tag from stamp with newer
+    // time-stamp.
+    VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+    VersionTag versionTag = VersionTag.create(memberId);
 
-    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
-        remoteTag.getVersionTimeStamp());
-  }
+    int entryVersion = stamp.getEntryVersion() - 1;
+    int dsid = stamp.getDistributedSystemId();
 
-  @Test
-  public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0); // server1 site1
-    VM vm1 = host.getVM(1); // server2 site1
+    // Increment the time by 1 in case the time is the same as the previous event.
+    // The entry's version timestamp can be incremented by 1 in certain circumstances.
+    // See AbstractRegionEntry.generateVersionTag.
+    long time = System.currentTimeMillis() + 1;
 
-    VM vm2 = host.getVM(2); // server1 site2
-    VM vm3 = host.getVM(3); // server2 site2
+    versionTag.setEntryVersion(entryVersion);
+    versionTag.setDistributedSystemId(dsid);
+    versionTag.setVersionTimeStamp(time);
+    versionTag.setIsRemoteForTesting();
 
-    // Site 1
-    Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
+    EntryEventImpl event =
+        createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");
 
-    final String key = "key-1";
+    ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
 
-    vm0.invoke(() -> this.createCache(lnPort));
-    vm0.invoke(
-        () -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));
+    // Verify the new stamp
+    entry = region.getEntry(key);
+    assertTrue(entry instanceof NonTXEntry);
+    regionEntry = ((NonTXEntry) entry).getRegionEntry();
 
-    vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
-    vm0.invoke(() -> this.startSender("ln1"));
-    vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+    stamp = regionEntry.getVersionStamp();
+    assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
+        stamp.getVersionTimeStamp());
+    assertEquals(entryVersion + 1, stamp.getEntryVersion());
+    assertEquals(dsid, stamp.getDistributedSystemId());
 
-    // Site 2
-    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
-    Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+    return stamp.asVersionTag();
+  }
 
-    vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+  private VersionTag putEntryAndGetPartitionedRegionVersionTag(String key) {
+    Region region = cache.getRegion(regionName);
+    assertTrue(region instanceof PartitionedRegion);
 
-    vm3.invoke(() -> this.createCache(nyPort));
-    vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+    region.put(key, "value-1");
+    region.put(key, "value-2");
+    Entry entry = region.getEntry(key);
+    assertTrue(entry instanceof EntrySnapshot);
+    RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
 
-    final VersionTag tag = (VersionTag) vm0.invoke("Put a single entry and get its version", () -> {
-      Cache cache = CacheFactory.getAnyInstance();
-      Region region = cache.getRegion(regionName);
-      assertTrue(region instanceof PartitionedRegion);
+    VersionStamp stamp = regionEntry.getVersionStamp();
 
-      region.put(key, "value-1");
-      region.put(key, "value-2");
-      Entry entry = region.getEntry(key);
-      assertTrue(entry instanceof EntrySnapshot);
-      RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+    // Create a duplicate entry version tag from stamp with newer
+    // time-stamp.
+    VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+    VersionTag versionTag = VersionTag.create(memberId);
 
-      VersionStamp stamp = regionEntry.getVersionStamp();
+    int entryVersion = stamp.getEntryVersion() - 1;
+    int dsid = stamp.getDistributedSystemId();
 
-      // Create a duplicate entry version tag from stamp with newer
-      // time-stamp.
-      VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
-      VersionTag versionTag = VersionTag.create(memberId);
+    // Increment the time by 1 in case the time is the same as the previous event.
+    // The entry's version timestamp can be incremented by 1 in certain circumstances.
+    // See AbstractRegionEntry.generateVersionTag.
+    long time = System.currentTimeMillis() + 1;
 
-      int entryVersion = stamp.getEntryVersion() - 1;
-      int dsid = stamp.getDistributedSystemId();
+    versionTag.setEntryVersion(entryVersion);
+    versionTag.setDistributedSystemId(dsid);
+    versionTag.setVersionTimeStamp(time);
+    versionTag.setIsRemoteForTesting();
 
-      // Increment the time by 1 in case the time is the same as the previous event.
-      // The entry's version timestamp can be incremented by 1 in certain circumstances.
-      // See AbstractRegionEntry.generateVersionTag.
-      long time = System.currentTimeMillis() + 1;
+    EntryEventImpl event =
+        createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
 
-      versionTag.setEntryVersion(entryVersion);
-      versionTag.setDistributedSystemId(dsid);
-      versionTag.setVersionTimeStamp(time);
-      versionTag.setIsRemoteForTesting();
+    ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
 
-      EntryEventImpl event =
-          createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
+    // Verify the new stamp
+    entry = region.getEntry(key);
+    assertTrue(entry instanceof EntrySnapshot);
+    regionEntry = ((EntrySnapshot) entry).getRegionEntry();
 
-      ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+    stamp = regionEntry.getVersionStamp();
+    assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
+        stamp.getVersionTimeStamp());
+    assertEquals(++entryVersion, stamp.getEntryVersion());
+    assertEquals(dsid, stamp.getDistributedSystemId());
 
-      // Verify the new stamp
-      entry = region.getEntry(key);
-      assertTrue(entry instanceof EntrySnapshot);
-      regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+    return stamp.asVersionTag();
+  }
 
-      stamp = regionEntry.getVersionStamp();
-      assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
-          time, stamp.getVersionTimeStamp());
-      assertEquals(++entryVersion, stamp.getEntryVersion());
-      assertEquals(dsid, stamp.getDistributedSystemId());
+  private VersionTag getReplicatedRegionVersionTag(final String key, final VersionTag localTag) {
+    final Region region = cache.getRegion(regionName);
 
-      return stamp.asVersionTag();
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> region.getEntry(key) != null);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+      Entry entry = region.getEntry(key);
+      assertTrue(entry instanceof NonTXEntry);
+      RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+      return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
     });
 
-    VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
-      Cache cache = CacheFactory.getAnyInstance();
-      final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
-      // wait for entry to be received
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          Entry<?, ?> entry = null;
-          try {
-            entry = region.getDataStore().getEntryLocally(0, key, false, false);
-          } catch (EntryNotFoundException | ForceReattemptException e) {
-            // expected
-          } catch (PRLocallyDestroyedException e) {
-            throw new RuntimeException("unexpected exception", e);
-          }
-          if (entry != null) {
-            LogWriterUtils.getLogWriter().info("found entry " + entry);
-          }
-          return (entry != null);
-        }
-
-        public String description() {
-          return "Expected key-1 to be received on remote WAN site";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
-
-      wc = new WaitCriterion() {
-        public boolean done() {
-          Entry entry = region.getEntry(key);
-          assertTrue(entry instanceof EntrySnapshot);
-          RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-          return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
-        }
-
-        public String description() {
-          return "waiting for timestamp to be updated";
-        }
-      };
-      Wait.waitForCriterion(wc, 30000, 500, true);
+    Entry entry = region.getEntry(key);
+    assertTrue(entry instanceof NonTXEntry);
+    RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+    VersionStamp stamp = regionEntry.getVersionStamp();
 
+    return stamp.asVersionTag();
+  }
+
+  private VersionTag getPartitionedRegionVersionTag(final String key, final VersionTag localTag) {
+    final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+      Entry<?, ?> entry = null;
+      try {
+        entry = region.getDataStore().getEntryLocally(0, key, false, false);
+      } catch (EntryNotFoundException | ForceReattemptException e) {
+        // expected
+      } catch (PRLocallyDestroyedException e) {
+        throw new RuntimeException("unexpected exception", e);
+      }
+      if (entry != null) {
+        LogWriterUtils.getLogWriter().info("found entry " + entry);
+      }
+      return (entry != null);
+    });
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
       Entry entry = region.getEntry(key);
       assertTrue(entry instanceof EntrySnapshot);
       RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+      return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
+    });
 
-      VersionStamp stamp = regionEntry.getVersionStamp();
+    Entry entry = region.getEntry(key);
+    assertTrue(entry instanceof EntrySnapshot);
+    RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
 
-      return stamp.asVersionTag();
-    });
+    VersionStamp stamp = regionEntry.getVersionStamp();
 
-    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
-        remoteTag.getVersionTimeStamp());
+    return stamp.asVersionTag();
   }
 
   private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key,
@@ -752,22 +521,9 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
   }
 
   private void waitForSenderRunningState(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (sender != null && sender.isRunning()) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected sender isRunning state to be true but is false";
-      }
-    };
-    Wait.waitForCriterion(wc, 300000, 500, true);
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> sender != null && sender.isRunning());
   }
 
   private Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
@@ -782,10 +538,20 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
     props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
     props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
     props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
-    test.getSystem(props);
+    startLocatorDistributedSystem(props);
     return port;
   }
 
+  private void startLocatorDistributedSystem(Properties props) {
+    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+    try {
+      getSystem(props);
+    } finally {
+      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+    }
+  }
+
   private void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
       GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
@@ -916,16 +682,6 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
     }
   }
 
-  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        return s;
-      }
-    }
-    // if none of the senders matches with the supplied senderId, return null
-    return null;
-  }
-
   private Integer createFirstLocatorWithDSId(int dsId) {
     UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
@@ -937,7 +693,7 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
     props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
     props.setProperty(START_LOCATOR,
         "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    startLocatorDistributedSystem(props);
     return port;
   }
 }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
index 647ed9f..d7f4a0a 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({DistributedTest.class, WanTest.class})
@@ -174,67 +173,68 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
 
   public void doMultipleCacheServer(boolean durable) throws Exception {
     /* test scenario: */
-    /* create 1 GatewaySender on vm0 */
-    /* create 1 GatewayReceiver on vm1 */
-    /* create 2 cache servers on vm1, one with overflow. */
+    /* create 1 GatewaySender on vm5 */
+    /* create 1 GatewayReceiver on vm2 */
+    /* create 2 cache servers on vm2, one with overflow. */
     /* verify if the cache server2 still has the overflow attributes */
-    /* create 1 cache client1 on vm2 to register interest on cache server1 */
-    /* create 1 cache client2 on vm3 to register interest on cache server1 */
-    /* do some puts to GatewaySender on vm0 */
+    /* create 1 cache client1 on vm3 to register interest on cache server1 */
+    /* create 1 cache client2 on vm4 to register interest on cache server1 */
+    /* do some puts to GatewaySender on vm5 */
 
-    // create sender at ln
+    // start locators
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     // create receiver and cache servers will be at ny
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    vm1.invoke(() -> WANTestBase.createCache(nyPort));
-    int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver());
-    checkCacheServer(vm1, receiverPort, false, 0);
+    vm2.invoke(() -> WANTestBase.createCache(nyPort));
+    int receiverPort = vm2.invoke(() -> WANTestBase.createReceiver());
+    checkCacheServer(vm2, receiverPort, false, 0);
 
     // create PR for receiver
-    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
         null, 1, 100, isOffHeap()));
 
     // create cache server1 with overflow
-    int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT");
-    checkCacheServer(vm1, serverPort, true, 3);
+    int serverPort = createCacheServerWithCSC(vm2, true, 3, "entry", "DEFAULT");
+    checkCacheServer(vm2, serverPort, true, 3);
 
     // create cache server 2
-    final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null);
+    final int serverPort2 = createCacheServerWithCSC(vm2, false, 0, null, null);
     // Currently, only the first cache server's overflow attributes will take effect
     // It will be enhanced in GEODE-1102
-    checkCacheServer(vm1, serverPort2, true, 3);
+    checkCacheServer(vm2, serverPort2, true, 3);
     LogService.getLogger().info("receiverPort=" + receiverPort + ",serverPort=" + serverPort
         + ",serverPort2=" + serverPort2);
 
-    vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
-        "123", durable));
     vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
+        "123", durable));
+    vm4.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
         "124", durable));
 
-    vm0.invoke(() -> WANTestBase.createCache(lnPort));
-    vm0.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 400, false, false, null, true));
-    vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+    // create sender at ln
+    vm5.invoke(() -> WANTestBase.createCache(lnPort));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 400, false, false, null, true));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
         "ln", 1, 100, isOffHeap()));
-    vm0.invoke(() -> WANTestBase.startSender("ln"));
-    vm0.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS));
+    vm5.invoke(() -> WANTestBase.startSender("ln"));
+    vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS));
 
     /* verify */
-    verifyRegionSize(vm0, NUM_KEYS);
-    verifyRegionSize(vm1, NUM_KEYS);
-    verifyRegionSize(vm3, NUM_KEYS);
+    verifyRegionSize(vm5, NUM_KEYS);
     verifyRegionSize(vm2, NUM_KEYS);
+    verifyRegionSize(vm4, NUM_KEYS);
+    verifyRegionSize(vm3, NUM_KEYS);
 
     // close a cache server, then re-test
-    vm1.invoke(() -> closeACacheServer(serverPort2));
+    vm2.invoke(() -> closeACacheServer(serverPort2));
 
-    vm0.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS * 2));
+    vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS * 2));
 
     /* verify */
-    verifyRegionSize(vm0, NUM_KEYS * 2);
-    verifyRegionSize(vm1, NUM_KEYS * 2);
-    verifyRegionSize(vm3, NUM_KEYS * 2);
+    verifyRegionSize(vm5, NUM_KEYS * 2);
     verifyRegionSize(vm2, NUM_KEYS * 2);
+    verifyRegionSize(vm4, NUM_KEYS * 2);
+    verifyRegionSize(vm3, NUM_KEYS * 2);
 
     disconnectAllFromDS();
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
index 93c7d2c..3fbd8cc 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -24,10 +24,6 @@ import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.EvictionAction;
-import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.internal.ServerLocation;
@@ -35,19 +31,12 @@ import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.ClientServerObserverAdapter;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.ha.HAContainerRegion;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({DistributedTest.class, WanTest.class})
@@ -67,18 +56,18 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
 
   public void doMultipleCacheServer(boolean durable) throws Exception {
     Integer lnPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    vm1.invoke(() -> WANTestBase.createCache(lnPort));
-    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+    vm2.invoke(() -> WANTestBase.createCache(lnPort));
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
         null, 1, 100, isOffHeap()));
-    int serverPort = vm1.invoke(() -> WANTestBase.createCacheServer());
-    int serverPort2 = vm1.invoke(() -> WANTestBase.createCacheServer());
+    int serverPort = vm2.invoke(() -> WANTestBase.createCacheServer());
+    int serverPort2 = vm2.invoke(() -> WANTestBase.createCacheServer());
 
     if (durable) {
-      vm1.invoke(() -> setCacheClientProxyTestHook());
+      vm2.invoke(() -> setCacheClientProxyTestHook());
     } else {
-      vm2.invoke(() -> setClientServerObserver());
+      vm3.invoke(() -> setClientServerObserver());
     }
-    vm2.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost",
+    vm3.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost",
         getTestMethodName() + "_PR", "123", durable));
 
     vm0.invoke(() -> WANTestBase.createCache(lnPort));
@@ -87,26 +76,26 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
     int serverPort3 = vm0.invoke(() -> WANTestBase.createCacheServer());
 
     if (durable) {
-      vm1.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
+      vm2.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
     } else {
-      vm2.invoke(() -> checkResultAndUnsetClientServerObserver());
+      vm3.invoke(() -> checkResultAndUnsetClientServerObserver());
     }
     Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> {
-      return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1);
+      return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm2);
     });
 
     // close the current primary cache server, then re-test
-    int serverPortAtVM1 = vm1.invoke(() -> findCacheServerForPrimaryProxy());
+    int serverPortAtVM1 = vm2.invoke(() -> findCacheServerForPrimaryProxy());
     if (serverPortAtVM1 != 0) {
-      vm1.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1));
-      LogService.getLogger().info("Closed cache server on vm1:" + serverPortAtVM1);
+      vm2.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1));
+      LogService.getLogger().info("Closed cache server on vm2:" + serverPortAtVM1);
       Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> {
-        return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1);
+        return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm2);
       });
     } else {
       vm0.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPort3));
       LogService.getLogger().info("Closed cache server on vm0:" + serverPort3);
-      assertTrue(checkProxyIsPrimary(vm1));
+      assertTrue(checkProxyIsPrimary(vm2));
     }
     disconnectAllFromDS();
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index e1011ad..993133c 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -276,20 +276,38 @@ public class WANTestBase extends DistributedTestCase {
     String remoteLocator = remoteLocatorBuffer.toString();
     remoteLocator = remoteLocator.replace(" ", "");
     props.setProperty(REMOTE_LOCATORS, remoteLocator);
-    test.getSystem(props);
+    test.startLocatorDistributedSystem(props);
+  }
+
+  private void startLocator(int dsId, int locatorPort, int startLocatorPort, int remoteLocPort,
+      boolean startServerLocator) {
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
+    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+    props.setProperty(START_LOCATOR, "localhost[" + startLocatorPort + "],server="
+        + startServerLocator + ",peer=true,hostname-for-clients=localhost");
+    if (remoteLocPort != -1) {
+      props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+    }
+    startLocatorDistributedSystem(props);
+  }
+
+  private void startLocatorDistributedSystem(Properties props) {
+    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+    try {
+      getSystem(props);
+    } finally {
+      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+    }
   }
 
   public static Integer createFirstLocatorWithDSId(int dsId) {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + port + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    test.startLocator(dsId, port, port, -1, true);
     return port;
   }
 
@@ -297,13 +315,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + port + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    test.startLocator(dsId, port, port, -1, false);
     return port;
   }
 
@@ -311,13 +323,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    test.startLocator(dsId, locatorPort, port, -1, true);
     return port;
   }
 
@@ -325,13 +331,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
-    test.getSystem(props);
+    test.startLocator(dsId, locatorPort, port, -1, false);
     return port;
   }
 
@@ -339,28 +339,13 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + port + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocator(dsId, port, port, remoteLocPort, true);
     return port;
   }
 
   public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
     WANTestBase test = new WANTestBase();
-    Properties props = test.getDistributedSystemProperties();
-    props.put(LOG_LEVEL, "fine");
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + oldPort + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
-    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocator(dsId, oldPort, oldPort, remoteLocPort, true);
   }
 
 
@@ -368,14 +353,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + port + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
-    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocator(dsId, port, port, remoteLocPort, false);
     return port;
   }
 
@@ -383,14 +361,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + localPort + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
-    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocator(dsId, localPort, port, remoteLocPort, true);
     return port;
   }
 
@@ -398,7 +369,6 @@ public class WANTestBase extends DistributedTestCase {
       String hostnameForClients) throws IOException {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
-    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
@@ -413,14 +383,7 @@ public class WANTestBase extends DistributedTestCase {
     stopOldLocator();
     WANTestBase test = new WANTestBase();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
-    props.setProperty(LOCATORS, "localhost[" + localPort + "]");
-    props.setProperty(START_LOCATOR,
-        "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
-    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
-    test.getSystem(props);
+    test.startLocator(dsId, localPort, port, remoteLocPort, false);
     return port;
   }
 

-- 
To stop receiving notification emails like this one, please contact
boglesby@apache.org.