You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/04 20:24:33 UTC
[geode] branch develop updated: GEODE-3749:
DeltaPropagationDUnitTest.testBug40165ClientReconnects failure
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 77a0c76 GEODE-3749: DeltaPropagationDUnitTest.testBug40165ClientReconnects failure
77a0c76 is described below
commit 77a0c7647ccca101d9375501a6478fd7c77f3269
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Wed Oct 4 13:24:22 2017 -0700
GEODE-3749: DeltaPropagationDUnitTest.testBug40165ClientReconnects failure
---
.../internal/cache/DeltaPropagationDUnitTest.java | 661 +++++++++------------
1 file changed, 295 insertions(+), 366 deletions(-)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DeltaPropagationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DeltaPropagationDUnitTest.java
index a1860b3..e934b56 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DeltaPropagationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DeltaPropagationDUnitTest.java
@@ -13,12 +13,19 @@
* the License.
*/
/**
- *
+ *
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
+import static org.apache.geode.distributed.ConfigurationProperties.CONFLATE_EVENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.util.Properties;
@@ -64,7 +71,8 @@ import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
@@ -77,19 +85,17 @@ import org.apache.geode.test.junit.categories.DistributedTest;
@Category(DistributedTest.class)
public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
- private static final Compressor compressor = SnappyCompressor.getDefaultInstance();
+ private final Compressor compressor = SnappyCompressor.getDefaultInstance();
protected static Cache cache = null;
- protected static Pool pool = null;
-
- protected static VM VM0 = null;
+ protected VM vm0 = null;
- protected static VM VM1 = null;
+ protected VM vm1 = null;
- protected static VM VM2 = null;
+ protected VM vm2 = null;
- protected static VM VM3 = null;
+ protected VM vm3 = null;
private int PORT1;
@@ -99,110 +105,104 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
private static LogWriter logger = null;
- public static final int EVENTS_SIZE = 6;
+ private static final int EVENTS_SIZE = 6;
private static boolean lastKeyReceived = false;
- private static boolean markerReceived = false;
+ private boolean markerReceived = false;
- private static int numOfCreates;
+ private int numOfCreates;
- private static int numOfUpdates;
+ private int numOfUpdates;
private static int numOfInvalidates;
private static int numOfDestroys;
- private static int numOfEvents;
-
private static DeltaTestImpl[] deltaPut = new DeltaTestImpl[EVENTS_SIZE];
- private static boolean areListenerResultsValid = true;
+ private boolean areListenerResultsValid = true;
- private static boolean closeCache = false;
+ private boolean closeCache = false;
- private static StringBuffer listenerError = new StringBuffer("");
+ private StringBuffer listenerError = new StringBuffer("");
- public static String DELTA_KEY = "DELTA_KEY";
+ private static final String DELTA_KEY = "DELTA_KEY";
- public static String LAST_KEY = "LAST_KEY";
+ private static final String LAST_KEY = "LAST_KEY";
- public static final int NO_LISTENER = 0;
+ private static final int NO_LISTENER = 0;
- public static final int CLIENT_LISTENER = 1;
+ private static final int CLIENT_LISTENER = 1;
- public static final int SERVER_LISTENER = 2;
+ private static final int SERVER_LISTENER = 2;
- public static final int C2S2S_SERVER_LISTENER = 3;
+ private static final int C2S2S_SERVER_LISTENER = 3;
- public static final int LAST_KEY_LISTENER = 4;
+ private static final int LAST_KEY_LISTENER = 4;
- public static final int DURABLE_CLIENT_LISTENER = 5;
+ private static final int DURABLE_CLIENT_LISTENER = 5;
- public static final int CLIENT_LISTENER_2 = 6;
+ private static final int CLIENT_LISTENER_2 = 6;
- public static final String CREATE = "CREATE";
+ private static final String CREATE = "CREATE";
- public static final String UPDATE = "UPDATE";
+ private static final String UPDATE = "UPDATE";
- public static final String INVALIDATE = "INVALIDATE";
+ private static final String INVALIDATE = "INVALIDATE";
- public static final String DESTROY = "DESTROY";
+ private static final String DESTROY = "DESTROY";
@Override
public final void postSetUp() throws Exception {
final Host host = Host.getHost(0);
- VM0 = host.getVM(0);
- VM1 = host.getVM(1);
- VM2 = host.getVM(2);
- VM3 = host.getVM(3);
-
- VM0.invoke(() -> DeltaPropagationDUnitTest.resetAll());
- VM1.invoke(() -> DeltaPropagationDUnitTest.resetAll());
- VM2.invoke(() -> DeltaPropagationDUnitTest.resetAll());
- VM3.invoke(() -> DeltaPropagationDUnitTest.resetAll());
- DeltaPropagationDUnitTest.resetAll();
+ vm0 = host.getVM(0);
+ vm1 = host.getVM(1);
+ vm2 = host.getVM(2);
+ vm3 = host.getVM(3);
+
+ vm0.invoke(this::resetAll);
+ vm1.invoke(this::resetAll);
+ vm2.invoke(this::resetAll);
+ vm3.invoke(this::resetAll);
+ resetAll();
}
@Override
public final void preTearDown() throws Exception {
- DeltaPropagationDUnitTest.closeCache();
- VM2.invoke(() -> DeltaPropagationDUnitTest.closeCache());
- VM3.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ closeCache();
+ vm2.invoke((SerializableCallableIF) this::closeCache);
+ vm3.invoke((SerializableCallableIF) this::closeCache);
// Unset the isSlowStartForTesting flag
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM1.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm1.invoke(ConflationDUnitTest::unsetIsSlowStart);
// then close the servers
- VM0.invoke(() -> DeltaPropagationDUnitTest.closeCache());
- VM1.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ vm0.invoke((SerializableRunnableIF) this::closeCache);
+ vm1.invoke((SerializableRunnableIF) this::closeCache);
disconnectAllFromDS();
}
@Test
public void testS2CSuccessfulDeltaPropagationWithCompression() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE,
- new Integer(1), new Integer(NO_LISTENER), Boolean.FALSE, compressor))).intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1,
+ NO_LISTENER, false, compressor));
- VM0.invoke(new SerializableRunnable() {
- public void run() {
- assertTrue(cache.getRegion(regionName).getAttributes().getCompressor() != null);
- }
- });
+ vm0.invoke(
+ () -> assertTrue(cache.getRegion(regionName).getAttributes().getCompressor() != null));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(CLIENT_LISTENER));
+ createClientCache(PORT1, -1, "0", CLIENT_LISTENER);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations()));
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
@@ -215,21 +215,19 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CSuccessfulDeltaPropagation() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(CLIENT_LISTENER));
+ createClientCache(PORT1, -1, "0", CLIENT_LISTENER);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations()));
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
@@ -242,23 +240,21 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CFailureInToDeltaMethod() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(CLIENT_LISTENER_2));
+ createClientCache(PORT1, -1, "0", CLIENT_LISTENER_2);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareErroneousDeltasForToDelta());
+ vm0.invoke(this::prepareErroneousDeltasForToDelta);
prepareErroneousDeltasForToDelta();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations()));
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
- long toDeltafailures = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaFailures()));
+ long toDeltafailures = vm0.invoke(DeltaTestImpl::getToDeltaFailures);
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1 - 1/*
@@ -275,21 +271,19 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CFailureInFromDeltaMethod() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(CLIENT_LISTENER));
+ createClientCache(PORT1, -1, "0", CLIENT_LISTENER);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareErroneousDeltasForFromDelta());
+ vm0.invoke(this::prepareErroneousDeltasForFromDelta);
prepareErroneousDeltasForFromDelta();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations()));
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
long fromDeltafailures = DeltaTestImpl.getFromDeltaFailures();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
@@ -305,31 +299,28 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CWithOldValueAtClientOverflownToDisk() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
EvictionAttributes evAttr =
EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK);
- createClientCache(new Integer(PORT1), new Integer(-1), "0", Boolean.TRUE/* add listener */,
- evAttr);
+ createClientCache(PORT1, -1, "0", true/* add listener */, evAttr);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createDelta());
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAnEntry());
+ vm0.invoke(this::createDelta);
+ vm0.invoke(this::createAnEntry);
Thread.sleep(5000); // TODO: Find a better 'n reliable alternative
// assert overflow occurred on client vm
verifyOverflowOccurred(1L, 2);
- VM0.invoke(() -> DeltaPropagationDUnitTest.updateDelta());
+ vm0.invoke(this::updateDelta);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue();
- long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
+ long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
@@ -342,31 +333,28 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CWithLocallyDestroyedOldValueAtClient() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
EvictionAttributes evAttr =
EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.LOCAL_DESTROY);
- createClientCache(new Integer(PORT1), new Integer(-1), "0", Boolean.TRUE/* add listener */,
- evAttr);
+ createClientCache(PORT1, -1, "0", true/* add listener */, evAttr);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createDelta());
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAnEntry());
+ vm0.invoke(this::createDelta);
+ vm0.invoke(this::createAnEntry);
Thread.sleep(5000); // TODO: Find a better 'n reliable alternative
// assert overflow occurred on client vm
verifyOverflowOccurred(1L, 1);
- VM0.invoke(() -> DeltaPropagationDUnitTest.updateDelta());
+ vm0.invoke(this::updateDelta);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue();
- long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
+ long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
@@ -379,24 +367,22 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CWithInvalidatedOldValueAtClient() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(CLIENT_LISTENER));
+ createClientCache(PORT1, -1, "0", CLIENT_LISTENER);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createDelta());
- VM0.invoke(() -> DeltaPropagationDUnitTest.invalidateDelta());
- VM0.invoke(() -> DeltaPropagationDUnitTest.updateDelta());
+ vm0.invoke(this::createDelta);
+ vm0.invoke(this::invalidateDelta);
+ vm0.invoke(this::updateDelta);
waitForLastKey();
- long toDeltas = ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue();
- long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
+ long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas,
toDeltas == (EVENTS_SIZE - 1));
@@ -409,101 +395,84 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CDeltaPropagationWithClientConflationON() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0",
- DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON, new Integer(LAST_KEY_LISTENER), null,
- null);
+ createClientCache(PORT1, -1, "0", DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON,
+ LAST_KEY_LISTENER, null, null);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
// TODO: (Amogh) get CCPStats and assert 0 deltas sent.
- assertTrue("Delta Propagation feature used.",
- DeltaTestImpl.getFromDeltaInvokations().longValue() == 0);
+ assertEquals(0, DeltaTestImpl.getFromDeltaInvokations().longValue());
}
@Test
public void testS2CDeltaPropagationWithServerConflationON() throws Exception {
- VM0.invoke(() -> DeltaPropagationDUnitTest.closeCache());
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY,
- Integer.valueOf(1), Integer.valueOf(NO_LISTENER), Boolean.TRUE /* conflate */, null)))
- .intValue();
+ vm0.invoke((SerializableRunnableIF) this::closeCache);
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1,
+ NO_LISTENER, true /* conflate */, null));
- createClientCache(new Integer(PORT1), new Integer(-1), "0",
- DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(LAST_KEY_LISTENER),
- null, null);
+ createClientCache(PORT1, -1, "0", DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT,
+ LAST_KEY_LISTENER, null, null);
- VM3.invoke(() -> DeltaPropagationDUnitTest.createClientCache(new Integer(PORT1),
- new Integer(-1), "0", DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF,
- new Integer(LAST_KEY_LISTENER), null, null));
+ vm3.invoke(() -> createClientCache(PORT1, -1, "0",
+ DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF, LAST_KEY_LISTENER, null, null));
registerInterestListAll();
- VM3.invoke(() -> DeltaPropagationDUnitTest.registerInterestListAll());
+ vm3.invoke(this::registerInterestListAll);
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ vm0.invoke(this::prepareDeltas);
+ vm0.invoke(this::createAndUpdateDeltas);
waitForLastKey();
- VM3.invoke(() -> DeltaPropagationDUnitTest.waitForLastKey());
+ vm3.invoke(this::waitForLastKey);
// TODO: (Amogh) use CCPStats.
- assertTrue("Delta Propagation feature used.",
- DeltaTestImpl.getFromDeltaInvokations().longValue() == 0);
- long fromDeltaInvocations = (Long) VM3.invoke(() -> DeltaTestImpl.getFromDeltaInvokations());
- assertTrue("Expected " + (EVENTS_SIZE - 1) + " fromDelta() invocations but found " + "",
- (fromDeltaInvocations == (EVENTS_SIZE - 1)));
+ assertEquals("Delta Propagation feature used.", 0,
+ DeltaTestImpl.getFromDeltaInvokations().longValue());
+ long fromDeltaInvocations = vm3.invoke(DeltaTestImpl::getFromDeltaInvokations);
+ assertEquals((EVENTS_SIZE - 1), fromDeltaInvocations);
}
@Test
public void testS2CDeltaPropagationWithOnlyCreateEvents() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(LAST_KEY_LISTENER));
+ createClientCache(PORT1, -1, "0", LAST_KEY_LISTENER);
registerInterestListAll();
- VM0.invoke(() -> DeltaPropagationDUnitTest.createDeltas());
+ vm0.invoke(this::createDeltas);
waitForLastKey();
- assertTrue("Delta Propagation feature used.",
- ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue() == 0);
- assertTrue("Delta Propagation feature used.",
- DeltaTestImpl.getFromDeltaInvokations().longValue() == 0);
+ assertEquals(0l, ((Long) vm0.invoke(DeltaTestImpl::getToDeltaInvokations)).longValue());
+ assertTrue("Delta Propagation feature used.", DeltaTestImpl.getFromDeltaInvokations() == 0);
}
/**
* Tests that an update on a server with full Delta object causes distribution of the full Delta
* instance, and not its delta bits, to other peers, even if that instance's
* <code>hasDelta()</code> returns true.
- *
- * @throws Exception
*/
@Test
public void testC2S2SDeltaPropagation() throws Exception {
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM1.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
+ vm1.invoke(this::prepareDeltas);
DeltaTestImpl val = deltaPut[1];
- VM0.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ vm0.invoke((SerializableRunnableIF) this::closeCache);
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY,
- new Integer(1), new Integer(C2S2S_SERVER_LISTENER)))).intValue();
- PORT2 = ((Integer) VM1.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY,
- new Integer(1), new Integer(C2S2S_SERVER_LISTENER)))).intValue();
+ PORT1 = vm0.invoke(
+ () -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1, C2S2S_SERVER_LISTENER));
+ PORT2 = vm1.invoke(
+ () -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1, C2S2S_SERVER_LISTENER));
- createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(NO_LISTENER));
+ createClientCache(PORT1, -1, "0", NO_LISTENER);
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -511,61 +480,54 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
r.create(DELTA_KEY, deltaPut[0]);
// Invalidate the value at both the servers.
- VM0.invoke(() -> DeltaPropagationDUnitTest.doLocalOp(INVALIDATE, regionName, DELTA_KEY));
- VM1.invoke(() -> DeltaPropagationDUnitTest.doLocalOp(INVALIDATE, regionName, DELTA_KEY));
+ vm0.invoke(() -> doLocalOp(INVALIDATE, regionName, DELTA_KEY));
+ vm1.invoke(() -> doLocalOp(INVALIDATE, regionName, DELTA_KEY));
- VM0.invoke(() -> DeltaPropagationDUnitTest.assertOp(INVALIDATE, new Integer(1)));
- VM1.invoke(() -> DeltaPropagationDUnitTest.assertOp(INVALIDATE, new Integer(1)));
+ vm0.invoke(() -> assertOp(INVALIDATE, 1));
+ vm1.invoke(() -> assertOp(INVALIDATE, 1));
r.put(DELTA_KEY, val);
Thread.sleep(5000);
- // Assert that VM0 distributed val as full value to VM1.
- VM1.invoke(() -> DeltaPropagationDUnitTest.assertValue(regionName, DELTA_KEY, val));
+ // Assert that vm0 distributed val as full value to vm1.
+ vm1.invoke(() -> assertValue(regionName, DELTA_KEY, val));
- assertTrue("Delta Propagation feature used.",
- !((Boolean) VM0.invoke(() -> DeltaTestImpl.deltaFeatureUsed())).booleanValue());
- assertTrue("Delta Propagation feature used.",
- !((Boolean) VM1.invoke(() -> DeltaTestImpl.deltaFeatureUsed())).booleanValue());
+ assertTrue("Delta Propagation feature used.", !vm0.invoke(DeltaTestImpl::deltaFeatureUsed));
+ assertTrue("Delta Propagation feature used.", !vm1.invoke(DeltaTestImpl::deltaFeatureUsed));
assertTrue("Delta Propagation feature NOT used.", DeltaTestImpl.deltaFeatureUsed());
}
@Test
public void testS2S2CDeltaPropagationWithHAOverflow() throws Exception {
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM1.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
+ vm1.invoke(this::prepareDeltas);
- VM0.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ vm0.invoke((SerializableRunnableIF) this::closeCache);
- PORT1 = ((Integer) VM0.invoke(() -> DeltaPropagationDUnitTest
- .createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1)))).intValue();
- PORT2 = ((Integer) VM1.invoke(() -> DeltaPropagationDUnitTest
- .createServerCache(HARegionQueue.HA_EVICTION_POLICY_ENTRY, new Integer(1)))).intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1));
+ PORT2 = vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_ENTRY, 1));
- VM0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
- VM1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
- createClientCache(new Integer(PORT2), new Integer(-1), "0", new Integer(CLIENT_LISTENER));
+ createClientCache(PORT2, -1, "0", CLIENT_LISTENER);
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
- VM0.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
- VM1.invoke(() -> DeltaPropagationDUnitTest.confirmEviction(new Integer(PORT2)));
+ vm0.invoke(this::createAndUpdateDeltas);
+ vm1.invoke(() -> confirmEviction(PORT2));
- VM1.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm1.invoke(ConflationDUnitTest::unsetIsSlowStart);
waitForLastKey();
- long toDeltasOnServer1 =
- ((Long) VM0.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue();
- long fromDeltasOnServer2 =
- ((Long) VM1.invoke(() -> DeltaTestImpl.getFromDeltaInvokations())).longValue();
- long toDeltasOnServer2 =
- ((Long) VM1.invoke(() -> DeltaTestImpl.getToDeltaInvokations())).longValue();
- long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long toDeltasOnServer1 = (Long) vm0.invoke(DeltaTestImpl::getToDeltaInvokations);
+ long fromDeltasOnServer2 = (Long) vm1.invoke(DeltaTestImpl::getFromDeltaInvokations);
+ long toDeltasOnServer2 = (Long) vm1.invoke(DeltaTestImpl::getToDeltaInvokations);
+ long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltasOnServer1,
toDeltasOnServer1 == (EVENTS_SIZE - 1));
@@ -580,76 +542,69 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
@Test
public void testS2CDeltaPropagationWithGIIAndFailover() throws Exception {
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM1.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM2.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
-
- VM0.invoke(() -> DeltaPropagationDUnitTest.closeCache());
-
- PORT1 = ((Integer) VM0.invoke(() -> DeltaPropagationDUnitTest.createServerCache(
- HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1), new Integer(NO_LISTENER))))
- .intValue();
- PORT2 = ((Integer) VM1.invoke(() -> DeltaPropagationDUnitTest.createServerCache(
- HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1), new Integer(NO_LISTENER))))
- .intValue();
- int port3 = ((Integer) VM2.invoke(() -> DeltaPropagationDUnitTest.createServerCache(
- HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1), new Integer(NO_LISTENER))))
- .intValue();
+ vm0.invoke(this::prepareDeltas);
+ vm1.invoke(this::prepareDeltas);
+ vm2.invoke(this::prepareDeltas);
+
+ vm0.invoke((SerializableRunnableIF) this::closeCache);
+
+ PORT1 =
+ vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER));
+ PORT2 =
+ vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER));
+ int port3 =
+ vm2.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER));
// Do puts after slowing the dispatcher.
try {
- VM0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
- VM1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
- VM2.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm2.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
createClientCache(new int[] {PORT1, PORT2, port3}, "1",
- DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(CLIENT_LISTENER),
- null, null);
+ DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, CLIENT_LISTENER, null, null);
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
- VM primary = (((PoolImpl) pool).getPrimaryPort() == PORT1) ? VM0
- : ((((PoolImpl) pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
+ Pool testPool = PoolManager.getAll().values().stream().findFirst().get();
- primary.invoke(() -> DeltaPropagationDUnitTest.createAndUpdateDeltas());
+ VM primary = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0
+ : ((((PoolImpl) testPool).getPrimaryPort() == PORT2) ? vm1 : vm2);
+
+ primary.invoke(this::createAndUpdateDeltas);
Thread.sleep(5000);
- primary.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ primary.invoke((SerializableRunnableIF) this::closeCache);
Thread.sleep(5000);
- primary = (((PoolImpl) pool).getPrimaryPort() == PORT1) ? VM0
- : ((((PoolImpl) pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
+ primary = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0
+ : ((((PoolImpl) testPool).getPrimaryPort() == PORT2) ? vm1 : vm2);
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM1.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM2.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm1.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm2.invoke(ConflationDUnitTest::unsetIsSlowStart);
- primary.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ primary.invoke((SerializableRunnableIF) this::closeCache);
Thread.sleep(5000);
- primary = (((PoolImpl) pool).getPrimaryPort() == PORT1) ? VM0
- : ((((PoolImpl) pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
-
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("waiting for client to receive last_key");
waitForLastKey();
- long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltasOnClient,
fromDeltasOnClient == (EVENTS_SIZE - 1));
} finally {
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM1.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM2.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm1.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm2.invoke(ConflationDUnitTest::unsetIsSlowStart);
}
}
@Test
public void testBug40165ClientReconnects() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
/**
* 1. Create a cache server with slow dispatcher 2. Start a durable client with a custom cache
@@ -663,11 +618,11 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
// Step 0
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
// Step 1
try {
- VM0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
// Step 2
String durableClientId = getName() + "_client";
@@ -682,13 +637,13 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(60));
createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties,
- new Integer(DURABLE_CLIENT_LISTENER), Boolean.TRUE);
+ DURABLE_CLIENT_LISTENER, true);
// Step 3
- VM0.invoke(() -> DeltaPropagationDUnitTest.doPuts());
+ vm0.invoke((SerializableRunnableIF) this::doPuts);
// Step 4
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
// Step 5
// verifyDurableClientDisconnected();
@@ -696,27 +651,25 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
// Step 6
createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties,
- new Integer(DURABLE_CLIENT_LISTENER), Boolean.FALSE);
+ DURABLE_CLIENT_LISTENER, false);
// Step 7
waitForLastKey();
// Step 8
- long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations();
assertTrue("No deltas were to be received but received: " + fromDeltasOnClient,
fromDeltasOnClient < 1);
} finally {
// Step 4
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
}
}
@Test
public void testBug40165ClientFailsOver() throws Exception {
- PORT1 = ((Integer) VM0.invoke(
- () -> DeltaPropagationDUnitTest.createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)))
- .intValue();
+ PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
/**
* 1. Create two cache servers with slow dispatcher 2. Start a durable client with a custom
@@ -728,15 +681,14 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
// Step 0
prepareDeltas();
- VM0.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
- VM1.invoke(() -> DeltaPropagationDUnitTest.prepareDeltas());
+ vm0.invoke(this::prepareDeltas);
+ vm1.invoke(this::prepareDeltas);
try {
// Step 1
- VM0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
- PORT2 = ((Integer) VM1.invoke(() -> DeltaPropagationDUnitTest
- .createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY))).intValue();
- VM1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ vm0.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
+ PORT2 = vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY));
+ vm1.invoke(() -> ConflationDUnitTest.setIsSlowStart("60000"));
// Step 2
String durableClientId = getName() + "_client";
@@ -752,36 +704,37 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(60));
createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties,
- new Integer(DURABLE_CLIENT_LISTENER), Boolean.FALSE);
+ DURABLE_CLIENT_LISTENER, false);
// Step 3
- VM0.invoke(() -> DeltaPropagationDUnitTest.doPuts());
+ vm0.invoke((SerializableRunnableIF) this::doPuts);
} finally {
// Step 4
- VM0.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
- VM1.invoke(() -> ConflationDUnitTest.unsetIsSlowStart());
+ vm0.invoke(ConflationDUnitTest::unsetIsSlowStart);
+ vm1.invoke(ConflationDUnitTest::unsetIsSlowStart);
}
// Step 5
- VM pVM = (((PoolImpl) pool).getPrimaryPort() == PORT1) ? VM0 : VM1;
+ Pool testPool = PoolManager.getAll().values().stream().findFirst().get();
+ VM pVM = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0 : vm1;
while (!markerReceived) {
Thread.sleep(50);
}
// Step 6
- pVM.invoke(() -> DeltaPropagationDUnitTest.closeCache());
+ pVM.invoke((SerializableRunnableIF) this::closeCache);
Thread.sleep(5000);
// Step 7
waitForLastKey();
// Step 8
- long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations().longValue();
+ long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations();
assertTrue("Atleast 99 deltas were to be received but received: " + fromDeltasOnClient,
fromDeltasOnClient >= 99);
}
- public static void doLocalOp(String op, String rName, String key) {
+ public void doLocalOp(String op, String rName, String key) {
try {
Region r = cache.getRegion("/" + rName);
assertNotNull(r);
@@ -795,8 +748,8 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void assertOp(String op, Integer num) {
- final int expected = num.intValue();
+ public void assertOp(String op, Integer num) {
+ final int expected = num;
WaitCriterion wc = null;
if (INVALIDATE.equals(op)) {
wc = new WaitCriterion() {
@@ -822,7 +775,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
Wait.waitForCriterion(wc, 5 * 1000, 100, true);
}
- public static void assertValue(String rName, String key, Object expected) {
+ private void assertValue(String rName, String key, Object expected) {
try {
Region r = cache.getRegion("/" + rName);
assertNotNull(r);
@@ -834,7 +787,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void confirmEviction(Integer port) {
+ private void confirmEviction(Integer port) {
final EnableLRU cc = ((VMLRURegionMap) ((LocalRegion) cache.getRegion(
Region.SEPARATOR + CacheServerImpl.generateNameForClientMsgsRegion(port))).entries)
._getCCHelper();
@@ -851,10 +804,10 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
Wait.waitForCriterion(wc, 10 * 1000, 100, true);
}
- public static void waitForLastKey() {
+ private void waitForLastKey() {
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
- return DeltaPropagationDUnitTest.isLastKeyReceived();
+ return isLastKeyReceived();
}
public String description() {
@@ -864,9 +817,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
Wait.waitForCriterion(wc, 10 * 1000, 100, true);
}
- public static void prepareDeltas() {
+ private void prepareDeltas() {
for (int i = 0; i < EVENTS_SIZE; i++) {
- deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0], new TestObject1("0", 0));
+ deltaPut[i] = new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
@@ -881,9 +834,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5});
deltaPut[3].resetDeltaStatus();
- deltaPut[3].setDoubleVar(new Double(5));
- deltaPut[4].setDoubleVar(new Double(5));
- deltaPut[5].setDoubleVar(new Double(5));
+ deltaPut[3].setDoubleVar(5d);
+ deltaPut[4].setDoubleVar(5d);
+ deltaPut[5].setDoubleVar(5d);
deltaPut[4].resetDeltaStatus();
deltaPut[4].setStr("str changed");
@@ -894,9 +847,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
- public static void prepareErroneousDeltasForToDelta() {
+ private void prepareErroneousDeltasForToDelta() {
for (int i = 0; i < EVENTS_SIZE; i++) {
- deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0], new TestObject1("0", 0));
+ deltaPut[i] = new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
@@ -909,9 +862,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[4].setByteArr(new byte[] {1, 2, 3, 4, 5});
deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5});
- deltaPut[3].setDoubleVar(new Double(5));
- deltaPut[4].setDoubleVar(new Double(5));
- deltaPut[5].setDoubleVar(new Double(5));
+ deltaPut[3].setDoubleVar(5d);
+ deltaPut[4].setDoubleVar(5d);
+ deltaPut[5].setDoubleVar(5d);
deltaPut[4].setStr("str changed");
deltaPut[5].setStr("str changed");
@@ -920,9 +873,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
- public static void prepareErroneousDeltasForFromDelta() {
+ private void prepareErroneousDeltasForFromDelta() {
for (int i = 0; i < EVENTS_SIZE; i++) {
- deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0], new TestObject1("0", 0));
+ deltaPut[i] = new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
@@ -935,9 +888,9 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[4].setByteArr(new byte[] {1, 2, 3, 4, 5});
deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5});
- deltaPut[3].setDoubleVar(new Double(5));
- deltaPut[4].setDoubleVar(new Double(5));
- deltaPut[5].setDoubleVar(new Double(5));
+ deltaPut[3].setDoubleVar(5d);
+ deltaPut[4].setDoubleVar(5d);
+ deltaPut[5].setDoubleVar(5d);
deltaPut[4].setStr("str changed");
deltaPut[5].setStr(DeltaTestImpl.ERRONEOUS_STRING_FOR_FROM_DELTA);
@@ -946,11 +899,11 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
- public static void doPuts() {
+ private void doPuts() {
doPuts(100);
}
- public static void doPuts(Integer num) {
+ private void doPuts(Integer num) {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -966,12 +919,12 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void createAndUpdateDeltas() {
+ private void createAndUpdateDeltas() {
createDelta();
updateDelta();
}
- public static void createDelta() {
+ private void createDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -982,7 +935,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void updateDelta() {
+ private void updateDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -1001,7 +954,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void createDeltas() {
+ private void createDeltas() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -1015,7 +968,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void createAnEntry() {
+ private void createAnEntry() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -1026,18 +979,18 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void invalidateDelta() {
+ private void invalidateDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.invalidate(DELTA_KEY);
} catch (Exception ex) {
- org.apache.geode.test.dunit.Assert.fail("failed in invalidateDelta()", ex);
+ fail("failed in invalidateDelta()" + ex.getMessage());
}
}
- public static void verifyOverflowOccurred(long evictions, int regionsize) {
+ private void verifyOverflowOccurred(long evictions, int regionsize) {
EnableLRU cc =
((VMLRURegionMap) ((LocalRegion) cache.getRegion(regionName)).entries)._getCCHelper();
Assert.assertTrue(cc.getStats().getEvictions() == evictions,
@@ -1048,37 +1001,37 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
"Region size expected to be " + regionsize + " but was " + rSize);
}
- public static void verifyData(int creates, int updates) {
+ private void verifyData(int creates, int updates) {
assertEquals(creates, numOfCreates);
assertEquals(updates, numOfUpdates);
}
- public static Integer createServerCache(String ePolicy) throws Exception {
- return createServerCache(ePolicy, Integer.valueOf(1));
+ private Integer createServerCache(String ePolicy) throws Exception {
+ return createServerCache(ePolicy, 1);
}
- public static Integer createServerCache(String ePolicy, Integer cap) throws Exception {
- return createServerCache(ePolicy, cap, new Integer(NO_LISTENER));
+ private Integer createServerCache(String ePolicy, Integer cap) throws Exception {
+ return createServerCache(ePolicy, cap, NO_LISTENER);
}
- public static Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode)
+ private Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode)
throws Exception {
- return createServerCache(ePolicy, cap, listenerCode, Boolean.FALSE, null);
+ return createServerCache(ePolicy, cap, listenerCode, false, null);
}
- public static Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode,
+ private Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode,
Boolean conflate, Compressor compressor) throws Exception {
ConnectionTable.threadWantsSharedResources();
- new DeltaPropagationDUnitTest().createCache(new Properties());
+ createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setEnableSubscriptionConflation(conflate);
- if (listenerCode.intValue() != 0) {
+ if (listenerCode != 0) {
factory.addCacheListener(getCacheListener(listenerCode));
}
if (compressor != null) {
factory.setCompressor(compressor);
}
- if (listenerCode.intValue() == C2S2S_SERVER_LISTENER) {
+ if (listenerCode == C2S2S_SERVER_LISTENER) {
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setDataPolicy(DataPolicy.NORMAL);
factory.setConcurrencyChecksEnabled(false);
@@ -1106,18 +1059,18 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
File[] dirs1 = new File[] {overflowDirectory};
server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy);
- server1.getClientSubscriptionConfig().setCapacity(cap.intValue());
+ server1.getClientSubscriptionConfig().setCapacity(cap);
// specify diskstore for this server
server1.getClientSubscriptionConfig()
.setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName());
}
server1.start();
- return new Integer(server1.getPort());
+ return server1.getPort();
}
- public static CacheListener getCacheListener(Integer code) {
+ private CacheListener getCacheListener(Integer code) {
CacheListener listener = null;
- switch (code.intValue()) {
+ switch (code) {
case 0:
break;
case SERVER_LISTENER:
@@ -1249,31 +1202,30 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
- public static void createClientCache(Integer port1, Integer port2, String rLevel)
- throws Exception {
+ private void createClientCache(Integer port1, Integer port2, String rLevel) throws Exception {
createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT,
- new Integer(CLIENT_LISTENER), null, null);
+ CLIENT_LISTENER, null, null);
}
- public static void createClientCache(Integer port1, Integer port2, String rLevel,
- Boolean addListener, EvictionAttributes evictAttrs) throws Exception {
+ private void createClientCache(Integer port1, Integer port2, String rLevel, Boolean addListener,
+ EvictionAttributes evictAttrs) throws Exception {
createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT,
- new Integer(CLIENT_LISTENER), evictAttrs, null);
+ CLIENT_LISTENER, evictAttrs, null);
}
- public static void createClientCache(Integer port1, Integer port2, String rLevel,
- Boolean addListener, ExpirationAttributes expAttrs) throws Exception {
+ private void createClientCache(Integer port1, Integer port2, String rLevel, Boolean addListener,
+ ExpirationAttributes expAttrs) throws Exception {
createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT,
- new Integer(CLIENT_LISTENER), null, expAttrs);
+ CLIENT_LISTENER, null, expAttrs);
}
- public static void createClientCache(Integer port1, Integer port2, String rLevel,
- Integer listener) throws Exception {
+ private void createClientCache(Integer port1, Integer port2, String rLevel, Integer listener)
+ throws Exception {
createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT,
listener, null, null);
}
- public static void createClientCache(Integer port1, Integer port2, String rLevel, String conflate,
+ private void createClientCache(Integer port1, Integer port2, String rLevel, String conflate,
Integer listener, EvictionAttributes evictAttrs, ExpirationAttributes expAttrs)
throws Exception {
int[] ports = null;
@@ -1286,24 +1238,23 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
createClientCache(ports, rLevel, conflate, listener, evictAttrs, expAttrs);
}
- public static void createClientCache(int[] ports, String rLevel, String conflate,
- Integer listener, EvictionAttributes evictAttrs, ExpirationAttributes expAttrs)
- throws Exception {
+ private void createClientCache(int[] ports, String rLevel, String conflate, Integer listener,
+ EvictionAttributes evictAttrs, ExpirationAttributes expAttrs) throws Exception {
CacheServerTestUtil.disableShufflingOfEndpoints();
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(CONFLATE_EVENTS, conflate);
- new DeltaPropagationDUnitTest().createCache(props);
+ createCache(props);
AttributesFactory factory = new AttributesFactory();
- pool = ClientServerTestCase.configureConnectionPool(factory, "localhost", ports, true,
+ ClientServerTestCase.configureConnectionPool(factory, "localhost", ports, true,
Integer.parseInt(rLevel), 2, null, 1000, 250, false, -2);
factory.setScope(Scope.LOCAL);
- if (listener.intValue() != 0) {
- factory.addCacheListener(getCacheListener(listener.intValue()));
+ if (listener != 0) {
+ factory.addCacheListener(getCacheListener(listener));
}
if (evictAttrs != null) {
@@ -1326,7 +1277,7 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
factory.setConcurrencyChecksEnabled(false);
RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(regionName, attrs);
+ cache.createRegion(regionName, attrs);
logger = cache.getLogger();
}
@@ -1339,48 +1290,28 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
assertNotNull(cache);
}
- public static void verifyRegionSize(Integer regionSize, Integer msgsRegionsize, Integer port) {
- try {
- // Get the clientMessagesRegion and check the size.
- Region region = (Region) cache.getRegion("/" + regionName);
- Region msgsRegion = (Region) cache
- .getRegion(CacheServerImpl.generateNameForClientMsgsRegion(port.intValue()));
- logger.fine(
- "size<serverRegion, clientMsgsRegion>: " + region.size() + ", " + msgsRegion.size());
- assertEquals(regionSize.intValue(), region.size());
- assertEquals(msgsRegionsize.intValue(), msgsRegion.size());
- } catch (Exception e) {
- fail("failed in verifyRegionSize()" + e);
- }
- }
-
- public static void createDurableCacheClient(Pool poolAttr, String regionName,
- Properties dsProperties, Integer listenerCode, Boolean close) throws Exception {
+ private void createDurableCacheClient(Pool poolAttr, String regionName, Properties dsProperties,
+ Integer listenerCode, Boolean close) throws Exception {
new DeltaPropagationDUnitTest().createCache(dsProperties);
- PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
- pf.init(poolAttr);
- PoolImpl p = (PoolImpl) pf.create("DeltaPropagationDUnitTest");
+ PoolFactoryImpl poolFactory = (PoolFactoryImpl) PoolManager.createFactory();
+ poolFactory.init(poolAttr);
+ PoolImpl pool = (PoolImpl) poolFactory.create("DeltaPropagationDUnitTest");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
- factory.setPoolName(p.getName());
- if (listenerCode.intValue() != 0) {
+ factory.setPoolName(pool.getName());
+ if (listenerCode != 0) {
factory.addCacheListener(getCacheListener(listenerCode));
}
RegionAttributes attrs = factory.create();
Region r = cache.createRegion(regionName, attrs);
r.registerInterest("ALL_KEYS");
- pool = p;
cache.readyForEvents();
logger = cache.getLogger();
- closeCache = close.booleanValue();
+ closeCache = close;
}
- /*
- * public static void createDeltaEntries(Long num) { }
- */
-
- public static void registerInterestListAll() {
+ private void registerInterestListAll() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
@@ -1390,31 +1321,29 @@ public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase {
}
}
- public static void closeCache() {
+ private Object closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
+ return null;
}
+ return null;
}
- public static void closeCache(boolean keepalive) {
+ private void closeCache(boolean keepalive) {
if (cache != null && !cache.isClosed()) {
cache.close(keepalive);
cache.getDistributedSystem().disconnect();
}
}
- public static boolean isLastKeyReceived() {
+ private boolean isLastKeyReceived() {
return lastKeyReceived;
}
- public static void setLastKeyReceived(boolean val) {
- lastKeyReceived = val;
- }
-
- public static void resetAll() {
+ private void resetAll() {
DeltaTestImpl.resetDeltaInvokationCounters();
- numOfCreates = numOfUpdates = numOfInvalidates = numOfDestroys = numOfEvents = 0;
+ numOfCreates = numOfUpdates = numOfInvalidates = numOfDestroys = 0;
lastKeyReceived = false;
markerReceived = false;
areListenerResultsValid = true;
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].