You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2018/03/19 20:01:45 UTC
[geode] branch develop updated: GEODE-4815: Forced locator members
to be LOCATOR_DM_TYPES
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 752bc23 GEODE-4815: Forced locator members to be LOCATOR_DM_TYPES
752bc23 is described below
commit 752bc238c37003ebabe132af7598469cbbf07eb6
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Mon Mar 19 13:01:42 2018 -0700
GEODE-4815: Forced locator members to be LOCATOR_DM_TYPES
---
.../cache/wan/AsyncEventQueueTestBase.java | 15 +-
.../internal/cache/UpdateVersionDUnitTest.java | 668 +++++++--------------
.../cache/wan/CacheClientNotifierDUnitTest.java | 64 +-
.../cache/wan/Simple2CacheServerDUnitTest.java | 41 +-
.../geode/internal/cache/wan/WANTestBase.java | 105 ++--
5 files changed, 306 insertions(+), 587 deletions(-)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 7a956c8..8366ca7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -93,6 +93,7 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -183,7 +184,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
props.setProperty(LOCATORS, "localhost[" + port + "]");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ test.startLocatorDistributedSystem(props);
return port;
}
@@ -197,10 +198,20 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocatorDistributedSystem(props);
return port;
}
+ private void startLocatorDistributedSystem(Properties props) {
+ // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+ System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+ try {
+ getSystem(props);
+ } finally {
+ System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+ }
+ }
+
public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
String asyncQueueIds, Boolean offHeap) {
IgnoredException exp1 =
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
index 4e75db1..c6d97e3 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
@@ -25,13 +25,14 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
@@ -49,6 +50,7 @@ import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.LocalRegion.NonTXEntry;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
@@ -60,11 +62,7 @@ import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.WanTest;
@@ -88,532 +86,303 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testUpdateVersionAfterCreateWithSerialSender() {
Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
+ VM vm0 = host.getVM(0); // locator site1
VM vm1 = host.getVM(1); // server2 site1
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
+ VM vm2 = host.getVM(2); // locator site2
+ VM vm3 = host.getVM(3); // server1 site2
+ VM vm4 = host.getVM(4); // server2 site2
final String key = "key-1";
// Site 1
Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
- vm0.invoke(() -> this.createCache(lnPort));
- vm0.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
+ vm1.invoke(() -> this.createCache(lnPort));
+ vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
- vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> this.startSender("ln1"));
- vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+ vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+ vm1.invoke(() -> this.startSender("ln1"));
+ vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
- Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
- vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
- vm3.invoke(() -> this.createCache(nyPort));
vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ vm4.invoke(() -> this.createCache(nyPort));
+ vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
- final VersionTag tag =
- (VersionTag) vm0.invoke("Update a single entry and get its version", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
-
- region.put(key, "value-1");
- region.put(key, "value-2");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId =
- (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag versionTag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion() - 1;
- int dsid = stamp.getDistributedSystemId();
-
- // Increment the time by 1 in case the time is the same as the previous event.
- // The entry's version timestamp can be incremented by 1 in certain circumstances.
- // See AbstractRegionEntry.generateVersionTag.
- long time = System.currentTimeMillis() + 1;
-
- versionTag.setEntryVersion(entryVersion);
- versionTag.setDistributedSystemId(dsid);
- versionTag.setVersionTimeStamp(time);
- versionTag.setIsRemoteForTesting();
-
- EntryEventImpl event =
- createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?, ?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException | ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected " + key + " to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
-
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
+ VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+ VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
- Entry entry = region.getEntry(key);
- assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
+ assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
@Test
public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
+ VM vm0 = host.getVM(0); // locator site1
+ VM vm1 = host.getVM(1); // server1 site1
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
+ VM vm2 = host.getVM(2); // locator site2
+ VM vm3 = host.getVM(3); // server1 site2
+ VM vm4 = host.getVM(4); // server2 site2
final String key = "key-1";
// Site 1
Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
- vm0.invoke(() -> this.createCache(lnPort));
- vm0.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
+ vm1.invoke(() -> this.createCache(lnPort));
+ vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
- vm0.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
- vm0.invoke(() -> this.startSender("ln1"));
- vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+ vm1.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
+ vm1.invoke(() -> this.startSender("ln1"));
+ vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
- Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
- vm2.invoke(() -> this.createReplicatedRegion(regionName, ""));
- vm3.invoke(() -> this.createCache(nyPort));
vm3.invoke(() -> this.createReplicatedRegion(regionName, ""));
+ vm4.invoke(() -> this.createCache(nyPort));
+ vm4.invoke(() -> this.createReplicatedRegion(regionName, ""));
- final VersionTag tag =
- (VersionTag) vm0.invoke("Update a single entry and get its version", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof DistributedRegion);
-
- region.put(key, "value-1");
- region.put(key, "value-2");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId =
- (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag versionTag = VersionTag.create(memberId);
-
- int entryVersion = stamp.getEntryVersion() - 1;
- int dsid = stamp.getDistributedSystemId();
-
- // Increment the time by 1 in case the time is the same as the previous event.
- // The entry's version timestamp can be incremented by 1 in certain circumstances.
- // See AbstractRegionEntry.generateVersionTag.
- long time = System.currentTimeMillis() + 1;
-
- versionTag.setEntryVersion(entryVersion);
- versionTag.setDistributedSystemId(dsid);
- versionTag.setVersionTimeStamp(time);
- versionTag.setIsRemoteForTesting();
-
- EntryEventImpl event =
- createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");
-
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
-
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- regionEntry = ((NonTXEntry) entry).getRegionEntry();
-
- stamp = regionEntry.getVersionStamp();
- assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(entryVersion + 1, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
-
- return stamp.asVersionTag();
- });
-
- VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
-
- Cache cache = CacheFactory.getAnyInstance();
- final Region region = cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return (region.getEntry(key) != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
-
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof NonTXEntry);
- RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+ VersionTag localTag = vm1.invoke(() -> putEntryAndGetReplicatedRegionVersionTag(key));
+ VersionTag remoteTag = vm4.invoke(() -> getReplicatedRegionVersionTag(key, localTag));
- VersionStamp stamp = regionEntry.getVersionStamp();
-
- return stamp.asVersionTag();
- });
-
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
+ assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
@Test
public void testUpdateVersionAfterCreateWithParallelSender() {
Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
+ VM vm0 = host.getVM(0); // locator site1
+ VM vm1 = host.getVM(1); // server1 site1
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
+ VM vm2 = host.getVM(2); // locator site2
+ VM vm3 = host.getVM(3); // server1 site2
+ VM vm4 = host.getVM(4); // server2 site2
// Site 1
Integer lnPort = vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
final String key = "key-1";
- vm0.invoke(() -> this.createCache(lnPort));
- vm0.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));
+ vm1.invoke(() -> this.createCache(lnPort));
+ vm1.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));
- vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> this.startSender("ln1"));
- vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+ vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+ vm1.invoke(() -> this.startSender("ln1"));
+ vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
- Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
-
- vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
- vm3.invoke(() -> this.createCache(nyPort));
vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ vm4.invoke(() -> this.createCache(nyPort));
+ vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
- final VersionTag tag = vm0.invoke("Put a single entry and get its version", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
+ VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+ VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
- region.put(key, "value-1");
- region.put(key, "value-2");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
+ remoteTag.getVersionTimeStamp());
+ }
- VersionStamp stamp = regionEntry.getVersionStamp();
+ @Test
+ public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // locator site1
+ VM vm1 = host.getVM(1); // server1 site1
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag versionTag = VersionTag.create(memberId);
+ VM vm2 = host.getVM(2); // locator site2
+ VM vm3 = host.getVM(3); // server1 site2
+ VM vm4 = host.getVM(4); // server2 site2
- int entryVersion = stamp.getEntryVersion() - 1;
- int dsid = stamp.getDistributedSystemId();
+ // Site 1
+ Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
- // Increment the time by 1 in case the time is the same as the previous event.
- // The entry's version timestamp can be incremented by 1 in certain circumstances.
- // See AbstractRegionEntry.generateVersionTag.
- long time = System.currentTimeMillis() + 1;
+ final String key = "key-1";
- versionTag.setEntryVersion(entryVersion);
- versionTag.setDistributedSystemId(dsid);
- versionTag.setVersionTimeStamp(time);
- versionTag.setIsRemoteForTesting();
+ vm1.invoke(() -> this.createCache(lnPort));
+ vm1.invoke(
+ () -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));
- EntryEventImpl event =
- createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
+ vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
+ vm1.invoke(() -> this.startSender("ln1"));
+ vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+ // Site 2
+ Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ vm4.invoke(() -> this.createCache(nyPort));
+ vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
- stamp = regionEntry.getVersionStamp();
- assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
+ VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
+ VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
- return stamp.asVersionTag();
- });
+ assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
+ remoteTag.getVersionTimeStamp());
+ }
- VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?, ?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException e) {
- // expected
- } catch (ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
-
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
+ private VersionTag putEntryAndGetReplicatedRegionVersionTag(String key) {
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof DistributedRegion);
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ region.put(key, "value-1");
+ region.put(key, "value-2");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
- VersionStamp stamp = regionEntry.getVersionStamp();
+ VersionStamp stamp = regionEntry.getVersionStamp();
- return stamp.asVersionTag();
- });
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag versionTag = VersionTag.create(memberId);
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
- remoteTag.getVersionTimeStamp());
- }
+ int entryVersion = stamp.getEntryVersion() - 1;
+ int dsid = stamp.getDistributedSystemId();
- @Test
- public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0); // server1 site1
- VM vm1 = host.getVM(1); // server2 site1
+ // Increment the time by 1 in case the time is the same as the previous event.
+ // The entry's version timestamp can be incremented by 1 in certain circumstances.
+ // See AbstractRegionEntry.generateVersionTag.
+ long time = System.currentTimeMillis() + 1;
- VM vm2 = host.getVM(2); // server1 site2
- VM vm3 = host.getVM(3); // server2 site2
+ versionTag.setEntryVersion(entryVersion);
+ versionTag.setDistributedSystemId(dsid);
+ versionTag.setVersionTimeStamp(time);
+ versionTag.setIsRemoteForTesting();
- // Site 1
- Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
+ EntryEventImpl event =
+ createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");
- final String key = "key-1";
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
- vm0.invoke(() -> this.createCache(lnPort));
- vm0.invoke(
- () -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ regionEntry = ((NonTXEntry) entry).getRegionEntry();
- vm0.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
- vm0.invoke(() -> this.startSender("ln1"));
- vm0.invoke(() -> this.waitForSenderRunningState("ln1"));
+ stamp = regionEntry.getVersionStamp();
+ assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
+ stamp.getVersionTimeStamp());
+ assertEquals(entryVersion + 1, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
- // Site 2
- Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
- Integer nyRecPort = (Integer) vm2.invoke(() -> this.createReceiver(nyPort));
+ return stamp.asVersionTag();
+ }
- vm2.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ private VersionTag putEntryAndGetPartitionedRegionVersionTag(String key) {
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof PartitionedRegion);
- vm3.invoke(() -> this.createCache(nyPort));
- vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
+ region.put(key, "value-1");
+ region.put(key, "value-2");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- final VersionTag tag = (VersionTag) vm0.invoke("Put a single entry and get its version", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- Region region = cache.getRegion(regionName);
- assertTrue(region instanceof PartitionedRegion);
+ VersionStamp stamp = regionEntry.getVersionStamp();
- region.put(key, "value-1");
- region.put(key, "value-2");
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag versionTag = VersionTag.create(memberId);
- VersionStamp stamp = regionEntry.getVersionStamp();
+ int entryVersion = stamp.getEntryVersion() - 1;
+ int dsid = stamp.getDistributedSystemId();
- // Create a duplicate entry version tag from stamp with newer
- // time-stamp.
- VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
- VersionTag versionTag = VersionTag.create(memberId);
+ // Increment the time by 1 in case the time is the same as the previous event.
+ // The entry's version timestamp can be incremented by 1 in certain circumstances.
+ // See AbstractRegionEntry.generateVersionTag.
+ long time = System.currentTimeMillis() + 1;
- int entryVersion = stamp.getEntryVersion() - 1;
- int dsid = stamp.getDistributedSystemId();
+ versionTag.setEntryVersion(entryVersion);
+ versionTag.setDistributedSystemId(dsid);
+ versionTag.setVersionTimeStamp(time);
+ versionTag.setIsRemoteForTesting();
- // Increment the time by 1 in case the time is the same as the previous event.
- // The entry's version timestamp can be incremented by 1 in certain circumstances.
- // See AbstractRegionEntry.generateVersionTag.
- long time = System.currentTimeMillis() + 1;
+ EntryEventImpl event =
+ createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
- versionTag.setEntryVersion(entryVersion);
- versionTag.setDistributedSystemId(dsid);
- versionTag.setVersionTimeStamp(time);
- versionTag.setIsRemoteForTesting();
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
- EntryEventImpl event =
- createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+ stamp = regionEntry.getVersionStamp();
+ assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
+ stamp.getVersionTimeStamp());
+ assertEquals(++entryVersion, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
- // Verify the new stamp
- entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ return stamp.asVersionTag();
+ }
- stamp = regionEntry.getVersionStamp();
- assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
- time, stamp.getVersionTimeStamp());
- assertEquals(++entryVersion, stamp.getEntryVersion());
- assertEquals(dsid, stamp.getDistributedSystemId());
+ private VersionTag getReplicatedRegionVersionTag(final String key, final VersionTag localTag) {
+ final Region region = cache.getRegion(regionName);
- return stamp.asVersionTag();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> region.getEntry(key) != null);
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
});
- VersionTag remoteTag = (VersionTag) vm3.invoke("Get timestamp from remote site", () -> {
- Cache cache = CacheFactory.getAnyInstance();
- final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-
- // wait for entry to be received
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Entry<?, ?> entry = null;
- try {
- entry = region.getDataStore().getEntryLocally(0, key, false, false);
- } catch (EntryNotFoundException | ForceReattemptException e) {
- // expected
- } catch (PRLocallyDestroyedException e) {
- throw new RuntimeException("unexpected exception", e);
- }
- if (entry != null) {
- LogWriterUtils.getLogWriter().info("found entry " + entry);
- }
- return (entry != null);
- }
-
- public String description() {
- return "Expected key-1 to be received on remote WAN site";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
-
- wc = new WaitCriterion() {
- public boolean done() {
- Entry entry = region.getEntry(key);
- assertTrue(entry instanceof EntrySnapshot);
- RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
- }
-
- public String description() {
- return "waiting for timestamp to be updated";
- }
- };
- Wait.waitForCriterion(wc, 30000, 500, true);
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+ return stamp.asVersionTag();
+ }
+
+ private VersionTag getPartitionedRegionVersionTag(final String key, final VersionTag localTag) {
+ final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ Entry<?, ?> entry = null;
+ try {
+ entry = region.getDataStore().getEntryLocally(0, key, false, false);
+ } catch (EntryNotFoundException | ForceReattemptException e) {
+ // expected
+ } catch (PRLocallyDestroyedException e) {
+ throw new RuntimeException("unexpected exception", e);
+ }
+ if (entry != null) {
+ LogWriterUtils.getLogWriter().info("found entry " + entry);
+ }
+ return (entry != null);
+ });
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
Entry entry = region.getEntry(key);
assertTrue(entry instanceof EntrySnapshot);
RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
+ });
- VersionStamp stamp = regionEntry.getVersionStamp();
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
- return stamp.asVersionTag();
- });
+ VersionStamp stamp = regionEntry.getVersionStamp();
- assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(),
- remoteTag.getVersionTimeStamp());
+ return stamp.asVersionTag();
}
private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key,
@@ -752,22 +521,9 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
}
private void waitForSenderRunningState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && sender.isRunning()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender isRunning state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .until(() -> sender != null && sender.isRunning());
}
private Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
@@ -782,10 +538,20 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
- test.getSystem(props);
+ startLocatorDistributedSystem(props);
return port;
}
+ private void startLocatorDistributedSystem(Properties props) {
+ // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+ System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+ try {
+ getSystem(props);
+ } finally {
+ System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+ }
+ }
+
private void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
@@ -916,16 +682,6 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
}
}
- private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- return s;
- }
- }
- // if none of the senders matches with the supplied senderId, return null
- return null;
- }
-
private Integer createFirstLocatorWithDSId(int dsId) {
UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
@@ -937,7 +693,7 @@ public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ startLocatorDistributedSystem(props);
return port;
}
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
index 647ed9f..d7f4a0a 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.WanTest;
@Category({DistributedTest.class, WanTest.class})
@@ -174,67 +173,68 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
public void doMultipleCacheServer(boolean durable) throws Exception {
/* test scenario: */
- /* create 1 GatewaySender on vm0 */
- /* create 1 GatewayReceiver on vm1 */
- /* create 2 cache servers on vm1, one with overflow. */
+ /* create 1 GatewaySender on vm5 */
+ /* create 1 GatewayReceiver on vm2 */
+ /* create 2 cache servers on vm2, one with overflow. */
/* verify if the cache server2 still has the overflow attributes */
- /* create 1 cache client1 on vm2 to register interest on cache server1 */
- /* create 1 cache client2 on vm3 to register interest on cache server1 */
- /* do some puts to GatewaySender on vm0 */
+ /* create 1 cache client1 on vm3 to register interest on cache server1 */
+ /* create 1 cache client2 on vm4 to register interest on cache server1 */
+ /* do some puts to GatewaySender on vm5 */
- // create sender at ln
+ // start locators
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver and cache servers will be at ny
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
- vm1.invoke(() -> WANTestBase.createCache(nyPort));
- int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver());
- checkCacheServer(vm1, receiverPort, false, 0);
+ vm2.invoke(() -> WANTestBase.createCache(nyPort));
+ int receiverPort = vm2.invoke(() -> WANTestBase.createReceiver());
+ checkCacheServer(vm2, receiverPort, false, 0);
// create PR for receiver
- vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
null, 1, 100, isOffHeap()));
// create cache server1 with overflow
- int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT");
- checkCacheServer(vm1, serverPort, true, 3);
+ int serverPort = createCacheServerWithCSC(vm2, true, 3, "entry", "DEFAULT");
+ checkCacheServer(vm2, serverPort, true, 3);
// create cache server 2
- final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null);
+ final int serverPort2 = createCacheServerWithCSC(vm2, false, 0, null, null);
// Currently, only the first cache server's overflow attributes will take effect
// It will be enhanced in GEODE-1102
- checkCacheServer(vm1, serverPort2, true, 3);
+ checkCacheServer(vm2, serverPort2, true, 3);
LogService.getLogger().info("receiverPort=" + receiverPort + ",serverPort=" + serverPort
+ ",serverPort2=" + serverPort2);
- vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
- "123", durable));
vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
+ "123", durable));
+ vm4.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR",
"124", durable));
- vm0.invoke(() -> WANTestBase.createCache(lnPort));
- vm0.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 400, false, false, null, true));
- vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+ // create sender at ln
+ vm5.invoke(() -> WANTestBase.createCache(lnPort));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 400, false, false, null, true));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
"ln", 1, 100, isOffHeap()));
- vm0.invoke(() -> WANTestBase.startSender("ln"));
- vm0.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS));
+ vm5.invoke(() -> WANTestBase.startSender("ln"));
+ vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS));
/* verify */
- verifyRegionSize(vm0, NUM_KEYS);
- verifyRegionSize(vm1, NUM_KEYS);
- verifyRegionSize(vm3, NUM_KEYS);
+ verifyRegionSize(vm5, NUM_KEYS);
verifyRegionSize(vm2, NUM_KEYS);
+ verifyRegionSize(vm4, NUM_KEYS);
+ verifyRegionSize(vm3, NUM_KEYS);
// close a cache server, then re-test
- vm1.invoke(() -> closeACacheServer(serverPort2));
+ vm2.invoke(() -> closeACacheServer(serverPort2));
- vm0.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS * 2));
+ vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS * 2));
/* verify */
- verifyRegionSize(vm0, NUM_KEYS * 2);
- verifyRegionSize(vm1, NUM_KEYS * 2);
- verifyRegionSize(vm3, NUM_KEYS * 2);
+ verifyRegionSize(vm5, NUM_KEYS * 2);
verifyRegionSize(vm2, NUM_KEYS * 2);
+ verifyRegionSize(vm4, NUM_KEYS * 2);
+ verifyRegionSize(vm3, NUM_KEYS * 2);
disconnectAllFromDS();
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
index 93c7d2c..3fbd8cc 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -24,10 +24,6 @@ import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.cache.EvictionAction;
-import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.ServerLocation;
@@ -35,19 +31,12 @@ import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.ClientServerObserverAdapter;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.ha.HAContainerRegion;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.WanTest;
@Category({DistributedTest.class, WanTest.class})
@@ -67,18 +56,18 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
public void doMultipleCacheServer(boolean durable) throws Exception {
Integer lnPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- vm1.invoke(() -> WANTestBase.createCache(lnPort));
- vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
+ vm2.invoke(() -> WANTestBase.createCache(lnPort));
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR",
null, 1, 100, isOffHeap()));
- int serverPort = vm1.invoke(() -> WANTestBase.createCacheServer());
- int serverPort2 = vm1.invoke(() -> WANTestBase.createCacheServer());
+ int serverPort = vm2.invoke(() -> WANTestBase.createCacheServer());
+ int serverPort2 = vm2.invoke(() -> WANTestBase.createCacheServer());
if (durable) {
- vm1.invoke(() -> setCacheClientProxyTestHook());
+ vm2.invoke(() -> setCacheClientProxyTestHook());
} else {
- vm2.invoke(() -> setClientServerObserver());
+ vm3.invoke(() -> setClientServerObserver());
}
- vm2.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost",
+ vm3.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost",
getTestMethodName() + "_PR", "123", durable));
vm0.invoke(() -> WANTestBase.createCache(lnPort));
@@ -87,26 +76,26 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
int serverPort3 = vm0.invoke(() -> WANTestBase.createCacheServer());
if (durable) {
- vm1.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
+ vm2.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
} else {
- vm2.invoke(() -> checkResultAndUnsetClientServerObserver());
+ vm3.invoke(() -> checkResultAndUnsetClientServerObserver());
}
Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> {
- return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1);
+ return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm2);
});
// close the current primary cache server, then re-test
- int serverPortAtVM1 = vm1.invoke(() -> findCacheServerForPrimaryProxy());
+ int serverPortAtVM1 = vm2.invoke(() -> findCacheServerForPrimaryProxy());
if (serverPortAtVM1 != 0) {
- vm1.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1));
- LogService.getLogger().info("Closed cache server on vm1:" + serverPortAtVM1);
+ vm2.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1));
+ LogService.getLogger().info("Closed cache server on vm2:" + serverPortAtVM1);
Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> {
- return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1);
+ return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm2);
});
} else {
vm0.invoke(() -> CacheClientNotifierDUnitTest.closeACacheServer(serverPort3));
LogService.getLogger().info("Closed cache server on vm0:" + serverPort3);
- assertTrue(checkProxyIsPrimary(vm1));
+ assertTrue(checkProxyIsPrimary(vm2));
}
disconnectAllFromDS();
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index e1011ad..993133c 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -276,20 +276,38 @@ public class WANTestBase extends DistributedTestCase {
String remoteLocator = remoteLocatorBuffer.toString();
remoteLocator = remoteLocator.replace(" ", "");
props.setProperty(REMOTE_LOCATORS, remoteLocator);
- test.getSystem(props);
+ test.startLocatorDistributedSystem(props);
+ }
+
+ private void startLocator(int dsId, int locatorPort, int startLocatorPort, int remoteLocPort,
+ boolean startServerLocator) {
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
+ props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + startLocatorPort + "],server="
+ + startServerLocator + ",peer=true,hostname-for-clients=localhost");
+ if (remoteLocPort != -1) {
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ }
+ startLocatorDistributedSystem(props);
+ }
+
+ private void startLocatorDistributedSystem(Properties props) {
+ // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+ System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+ try {
+ getSystem(props);
+ } finally {
+ System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+ }
}
public static Integer createFirstLocatorWithDSId(int dsId) {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ test.startLocator(dsId, port, port, -1, true);
return port;
}
@@ -297,13 +315,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ test.startLocator(dsId, port, port, -1, false);
return port;
}
@@ -311,13 +323,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ test.startLocator(dsId, locatorPort, port, -1, true);
return port;
}
@@ -325,13 +331,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
- test.getSystem(props);
+ test.startLocator(dsId, locatorPort, port, -1, false);
return port;
}
@@ -339,28 +339,13 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocator(dsId, port, port, remoteLocPort, true);
return port;
}
public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
WANTestBase test = new WANTestBase();
- Properties props = test.getDistributedSystemProperties();
- props.put(LOG_LEVEL, "fine");
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + oldPort + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocator(dsId, oldPort, oldPort, remoteLocPort, true);
}
@@ -368,14 +353,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocator(dsId, port, port, remoteLocPort, false);
return port;
}
@@ -383,14 +361,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + localPort + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocator(dsId, localPort, port, remoteLocPort, true);
return port;
}
@@ -398,7 +369,6 @@ public class WANTestBase extends DistributedTestCase {
String hostnameForClients) throws IOException {
stopOldLocator();
WANTestBase test = new WANTestBase();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
@@ -413,14 +383,7 @@ public class WANTestBase extends DistributedTestCase {
stopOldLocator();
WANTestBase test = new WANTestBase();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
- props.setProperty(LOCATORS, "localhost[" + localPort + "]");
- props.setProperty(START_LOCATOR,
- "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- test.getSystem(props);
+ test.startLocator(dsId, localPort, port, remoteLocPort, false);
return port;
}
--
To stop receiving notification emails like this one, please contact
boglesby@apache.org.