You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/01/04 18:04:37 UTC
[1/8] incubator-geode git commit: GEODE-614: Increase maximum duration
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-715 49754b393 -> b3851df51
GEODE-614: Increase maximum duration
Increase ExpectedTimeout maximum duration to prevent false failures
when build job loses CPU for more than 2 seconds.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/935b76a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/935b76a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/935b76a3
Branch: refs/heads/feature/GEODE-715
Commit: 935b76a393fee86cc491afeba54e233eb0bf6a5e
Parents: 093ac12
Author: Kirk Lund <kl...@pivotal.io>
Authored: Tue Dec 22 14:28:07 2015 -0800
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Mon Dec 28 10:14:53 2015 -0800
----------------------------------------------------------------------
.../com/gemstone/gemfire/internal/process/PidFileJUnitTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/935b76a3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/process/PidFileJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/process/PidFileJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/process/PidFileJUnitTest.java
index 89f786a..5f81c2b 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/process/PidFileJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/process/PidFileJUnitTest.java
@@ -139,7 +139,7 @@ public class PidFileJUnitTest {
timeout.expect(TimeoutException.class);
timeout.expectMessage("Invalid pid 'null' found");
timeout.expectMinimumDuration(1000);
- timeout.expectMaximumDuration(2000);
+ timeout.expectMaximumDuration(10000);
timeout.expectTimeUnit(TimeUnit.MILLISECONDS);
new PidFile(file).readPid(1500, TimeUnit.MILLISECONDS);
[2/8] incubator-geode git commit: GEODE-707 cache loader not invoked
on concurrent load if first load fails with an exception
Posted by kl...@apache.org.
GEODE-707 cache loader not invoked on concurrent load if first load fails with an exception
avoid setting the future to {null, versionTag} when the loader throws an
exception so that a concurrent load attempt will be allowed
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/81eafccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/81eafccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/81eafccc
Branch: refs/heads/feature/GEODE-715
Commit: 81eafccce3cfce2500dcce786c65de1ae5b057dc
Parents: 935b76a
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Dec 29 07:54:56 2015 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Dec 29 07:54:56 2015 -0800
----------------------------------------------------------------------
.../gemfire/internal/cache/LocalRegion.java | 45 +++--
.../internal/cache/PartitionedRegion.java | 75 --------
.../gemfire/cache30/SearchAndLoadDUnitTest.java | 177 ++++++++++++++++++-
3 files changed, 199 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index caf07ce..2bc2f05 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -1261,7 +1261,7 @@ public class LocalRegion extends AbstractRegion
* @param disableCopyOnRead if true then disable copy on read
* @param preferCD true if the preferred result form is CachedDeserializable
* @param clientEvent client's event, if any (for version tag retrieval)
- * @param returnTombstones TODO
+ * @param returnTombstones whether destroyed entries should be returned
* @param retainResult if true then the result may be a retained off-heap reference
* @return the value for the given key
*/
@@ -1613,21 +1613,32 @@ public class LocalRegion extends AbstractRegion
throw err;
}
}
- // didn't find a future, do one more getDeserialized to catch race
- // condition where the future was just removed by another get thread
+ // didn't find a future, do one more probe for the entry to catch a race
+ // condition where the future was just removed by another thread
try {
- localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
- // TODO verify that this method is not used for PR or BR and hence allowReadFromHDFS does not matter
- // stats have now been updated
- if (localValue != null && !Token.isInvalid(localValue)) {
- result = localValue;
- return result;
+ boolean partitioned = this.getDataPolicy().withPartitioning();
+ if (!partitioned) {
+ localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
+
+ // stats have now been updated
+ if (localValue != null && !Token.isInvalid(localValue)) {
+ result = localValue;
+ return result;
+ }
+ isCreate = localValue == null;
+ result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
+ localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
+
+ } else {
+
+ // This code was moved from PartitionedRegion.nonTxnFindObject(). That method has been removed.
+ // For PRs we don't want to deserialize the value and we can't use findObjectInSystem because
+ // it can invoke code that is transactional.
+ result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
+ localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
+ // TODO why are we not passing the client event or returnTombstones in the above invokation?
}
- isCreate = localValue == null;
- result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
-
if (result == null && localValue != null) {
if (localValue != Token.TOMBSTONE || returnTombstones) {
result = localValue;
@@ -1636,8 +1647,12 @@ public class LocalRegion extends AbstractRegion
// findObjectInSystem does not call conditionalCopy
}
finally {
- VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag();
- thisFuture.set(new Object[]{result, tag});
+ if (result != null) {
+ VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag();
+ thisFuture.set(new Object[]{result, tag});
+ } else {
+ thisFuture.set(null);
+ }
this.getFutures.remove(keyInfo.getKey());
}
if (!disableCopyOnRead) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
old mode 100644
new mode 100755
index a36d719..a14e99f
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -265,9 +265,6 @@ import com.gemstone.gemfire.i18n.StringId;
* are copied up to a configurable level (for high availability) and placed on
* multiple VMs for improved performance and increased storage capacity.
*
- * @since 5.0
- * @author Rohit Reja, Tushar Apshankar, Girish Thombare, Negi Tribhuwan, Greg
- * Passmore, Mitch Thomas, Bruce Schuchardt
*/
public class PartitionedRegion extends LocalRegion implements
CacheDistributionAdvisee, QueryExecutor {
@@ -3314,78 +3311,6 @@ public class PartitionedRegion extends LocalRegion implements
}
/**
- * override the one in LocalRegion since we don't need to do getDeserialized.
- */
- @Override Object nonTxnFindObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
- throws TimeoutException, CacheLoaderException
- {
- Object result = null;
- FutureResult thisFuture = new FutureResult(getCancelCriterion());
- Future otherFuture = (Future)this.getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
- // only one thread can get their future into the map for this key at a time
- if (otherFuture != null) {
- try {
- result = otherFuture.get();
- if (result != null) {
- if (!preferCD && result instanceof CachedDeserializable) {
- CachedDeserializable cd = (CachedDeserializable)result;
- // fix for bug 43023
- if (!disableCopyOnRead && isCopyOnRead()) {
- result = cd.getDeserializedWritableCopy(null, null);
- } else {
- result = cd.getDeserializedForReading();
- }
-
- } else if (!disableCopyOnRead) {
- result = conditionalCopy(result);
- }
-
- //For sqlf since the deserialized value is nothing but chunk
- // before returning the found value increase its use count
- /* if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) {
- if(!((Chunk)result).use()) {
- return null;
- }
- }*/
- // what was a miss is now a hit
- RegionEntry re = null;
- if (isCreate) {
- re = basicGetEntry(keyInfo.getKey());
- updateStatsForGet(re, true);
- }
- return result;
- }
- // if value == null, try our own search/load
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // TODO check a CancelCriterion here?
- return null;
- }
- catch (ExecutionException e) {
- // unexpected since there is no background thread
- AssertionError err = new AssertionError("unexpected exception");
- err.initCause(err);
- throw err;
- }
- }
- try {
- result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
- }
- finally {
- if (result instanceof Chunk) {
- thisFuture.set(null);
- } else {
- thisFuture.set(result);
- }
- this.getFutures.remove(keyInfo.getKey());
- }
- return result;
- }
- /**
* override the one in LocalRegion since we don't need to do getDeserialized.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
index b33bda2..cf9ff9c 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
@@ -16,17 +16,17 @@
*/
package com.gemstone.gemfire.cache30;
-//import com.gemstone.gemfire.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import com.gemstone.gemfire.cache.*;
import dunit.*;
-//import hydra.ClientMgr;
/**
* This class tests various search load and write scenarios for distributed regions
- * @author Sudhir Menon
- *
*/
+@SuppressWarnings({"deprecation", "unchecked", "rawtypes", "serial"})
public class SearchAndLoadDUnitTest extends CacheTestCase {
static boolean loaderInvoked;
@@ -48,6 +48,10 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
/** A <code>CacheWriter</code> used by a test */
protected static TestCacheWriter writer;
+ static boolean exceptionThrown;
+ static final CountDownLatch readyForExceptionLatch = new CountDownLatch(1);
+ static final CountDownLatch loaderInvokedLatch = new CountDownLatch(1);
+
public SearchAndLoadDUnitTest(String name) {
super(name);
}
@@ -171,8 +175,166 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
});
}
- public void testNetLoadNoLoaders()
- throws CacheException, InterruptedException {
+
+ /**
+ * This test is for a bug in which a cache loader threw an exception
+ * that caused the wrong value to be put in a Future in nonTxnFindObject. This
+ * in turn caused a concurrent search for the object to not invoke the loader a
+ * second time.
+ *
+ * VM0 is used to create a cache and a region having a loader that simulates the
+ * conditions that caused the bug. One async thread then does a get() which invokes
+ * the loader. Another async thread does a get() which reaches nonTxnFindObject
+ * and blocks waiting for the first thread's load to complete. The loader then
+ * throws an exception that is sent back to the first thread. The second thread
+ * should then cause the loader to be invoked again, and this time the loader will
+ * return a value. Both threads then validate that they received the expected
+ * result.
+ */
+ public void testConcurrentLoad() throws Throwable {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ final String name = this.getUniqueName() + "Region";
+ final String objectName = "theKey";
+ final Integer value = new Integer(44);
+ final String exceptionString = "causing first cache-load to fail";
+
+ remoteLoaderInvoked = false;
+ loaderInvoked = false;
+
+ vm0.invoke(new CacheSerializableRunnable("create region " + name + " in vm0") {
+ public void run2() {
+ remoteLoaderInvoked = false;
+ loaderInvoked = false;
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setConcurrencyChecksEnabled(true);
+ factory.setCacheLoader(new CacheLoader() {
+ boolean firstInvocation = true;
+ public synchronized Object load(LoaderHelper helper) {
+ System.out.println("invoked cache loader for " + helper.getKey());
+ loaderInvoked = true;
+ loaderInvokedLatch.countDown();
+ if (firstInvocation) {
+ firstInvocation = false;
+ try {
+ // wait for both threads to be ready for the exception to be thrown
+ System.out.println("waiting for vm0t2 to be ready before throwing exception");
+ readyForExceptionLatch.await(30, TimeUnit.SECONDS);
+ // give the second thread time to get into loader code
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ System.out.println("throwing exception");
+ exceptionThrown = true;
+ throw new RuntimeException(exceptionString);
+ }
+ System.out.println("returning value="+value);
+ return value;
+ }
+
+ public void close() {
+
+ }
+ });
+
+ Region region = createRegion(name,factory.create());
+ region.create(objectName, null);
+ addExpectedException(exceptionString);
+ }
+ });
+
+ AsyncInvocation async1 = null;
+ try {
+ async1 = vm0.invokeAsync(new CacheSerializableRunnable("Concurrently invoke the remote loader on the same key - t1") {
+ public void run2() {
+ Region region = getCache().getRegion("root/"+name);
+
+ getLogWriter().info("t1 is invoking get("+objectName+")");
+ try {
+ getLogWriter().info("t1 retrieved value " + region.get(objectName));
+ fail("first load should have triggered an exception");
+ } catch (RuntimeException e) {
+ if (!e.getMessage().contains(exceptionString)) {
+ throw e;
+ }
+ }
+ }
+ });
+ vm0.invoke(new CacheSerializableRunnable("Concurrently invoke the loader on the same key - t2") {
+ public void run2() {
+ final Region region = getCache().getRegion("root/"+name);
+ final Object[] valueHolder = new Object[1];
+
+ // wait for vm1 to cause the loader to be invoked
+ getLogWriter().info("t2 is waiting for loader to be invoked by t1");
+ try {
+ loaderInvokedLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ assertTrue(loaderInvoked);
+
+ Thread t = new Thread("invoke get()") {
+ public void run() {
+ try {
+ valueHolder[0] = region.get(objectName);
+ } catch (RuntimeException e) {
+ valueHolder[0] = e;
+ }
+ }
+ };
+
+ t.setDaemon(true);
+ t.start();
+ try {
+ // let the thread get to the point of blocking on vm1's Future
+ // in LocalRegion.nonTxnFindObject()
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+
+ readyForExceptionLatch.countDown();
+ try {
+ t.join(30000);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ if (t.isAlive()) {
+ t.interrupt();
+ fail("get() operation blocked for too long - test needs some work");
+ }
+
+ getLogWriter().info("t2 is invoking get("+objectName+")");
+ Object value = valueHolder[0];
+ if (value instanceof RuntimeException) {
+ if ( ((Exception)value).getMessage().contains(exceptionString) ) {
+ fail("second load should not have thrown an exception");
+ } else {
+ throw (RuntimeException)value;
+ }
+ } else {
+ getLogWriter().info("t2 retrieved value " + value);
+ assertNotNull(value);
+ }
+ }
+ });
+ } finally {
+ if (async1 != null) {
+ async1.join();
+ if (async1.exceptionOccurred()) {
+ throw async1.getException();
+ }
+ }
+ }
+ }
+
+
+ public void testNetLoadNoLoaders() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
@@ -318,7 +480,6 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
VM vm2 = host.getVM(2);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "B";
- final Integer value = new Integer(43);
loaderInvoked = false;
remoteLoaderInvoked = false;
remoteLoaderInvokedCount = 0;
@@ -369,7 +530,7 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
}
});
- Region region = createRegion(name,factory.create());
+ createRegion(name,factory.create());
}
catch (CacheException ex) {
fail("While creating ACK region", ex);
[3/8] incubator-geode git commit: GEODE-708: Add stats for Geode
membership health monitor
Posted by kl...@apache.org.
GEODE-708: Add stats for Geode membership health monitor
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/91b43897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/91b43897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/91b43897
Branch: refs/heads/feature/GEODE-715
Commit: 91b438971a33b444c8de826bb18a3fc2cff4f8b4
Parents: 81eafcc
Author: Jianxia Chen <jc...@pivotal.io>
Authored: Tue Dec 29 09:38:43 2015 -0800
Committer: Jianxia Chen <jc...@pivotal.io>
Committed: Tue Dec 29 09:38:43 2015 -0800
----------------------------------------------------------------------
.../gemfire/distributed/internal/DMStats.java | 80 +++++++
.../distributed/internal/DistributionStats.java | 237 +++++++++++++++++++
.../internal/LonerDistributionManager.java | 64 +++++
.../membership/gms/fd/GMSHealthMonitor.java | 93 +++++---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 42 +++-
5 files changed, 482 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/91b43897/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
index e79a40b..7bf5b80 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
@@ -536,4 +536,84 @@ public interface DMStats {
public void endPdxInstanceDeserialization(long start);
public void incPdxInstanceCreations();
+
+ //Stats for GMSHealthMonitor
+ public long getHeartbeatRequestsSent();
+
+ public void incHeartbeatRequestsSent();
+
+ public long getHeartbeatRequestsReceived();
+
+ public void incHeartbeatRequestsReceived();
+
+ public long getHeartbeatsSent();
+
+ public void incHeartbeatsSent();
+
+ public long getHeartbeatsReceived();
+
+ public void incHeartbeatsReceived();
+
+
+ public long getSuspectsSent();
+
+ public void incSuspectsSent();
+
+ public long getSuspectsReceived();
+
+ public void incSuspectsReceived();
+
+
+ public long getFinalCheckRequestsSent();
+
+ public void incFinalCheckRequestsSent();
+
+ public long getFinalCheckRequestsReceived();
+
+ public void incFinalCheckRequestsReceived();
+
+ public long getFinalCheckResponsesSent();
+
+ public void incFinalCheckResponsesSent();
+
+ public long getFinalCheckResponsesReceived();
+
+ public void incFinalCheckResponsesReceived();
+
+
+ public long getTcpFinalCheckRequestsSent();
+
+ public void incTcpFinalCheckRequestsSent();
+
+ public long getTcpFinalCheckRequestsReceived();
+
+ public void incTcpFinalCheckRequestsReceived();
+
+ public long getTcpFinalCheckResponsesSent();
+
+ public void incTcpFinalCheckResponsesSent();
+
+ public long getTcpFinalCheckResponsesReceived();
+
+ public void incTcpFinalCheckResponsesReceived();
+
+
+ public long getUdpFinalCheckRequestsSent();
+
+ public void incUdpFinalCheckRequestsSent();
+
+// UDP final check is implemented using HeartbeatRequestMessage and HeartbeatMessage
+// So the following code is commented out.
+
+// public long getUdpFinalCheckRequestsReceived();
+//
+// public void incUdpFinalCheckRequestsReceived();
+//
+// public long getUdpFinalCheckResponsesSent();
+//
+// public void incUdpFinalCheckResponsesSent();
+
+ public long getUdpFinalCheckResponsesReceived();
+
+ public void incUdpFinalCheckResponsesReceived();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/91b43897/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
index 804b507..92b0fcb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
@@ -215,6 +215,26 @@ public class DistributionStats implements DMStats {
private final static int eldersId;
private final static int initialImageMessagesInFlightId;
private final static int initialImageRequestsInProgressId;
+
+ //For GMSHealthMonitor
+ private final static int heartbeatRequestsSentId;
+ private final static int heartbeatRequestsReceivedId;
+ private final static int heartbeatsSentId;
+ private final static int heartbeatsReceivedId;
+ private final static int suspectsSentId;
+ private final static int suspectsReceivedId;
+ private final static int finalCheckRequestsSentId;
+ private final static int finalCheckRequestsReceivedId;
+ private final static int finalCheckResponsesSentId;
+ private final static int finalCheckResponsesReceivedId;
+ private final static int tcpFinalCheckRequestsSentId;
+ private final static int tcpFinalCheckRequestsReceivedId;
+ private final static int tcpFinalCheckResponsesSentId;
+ private final static int tcpFinalCheckResponsesReceivedId;
+ private final static int udpFinalCheckRequestsSentId;
+ private final static int udpFinalCheckRequestsReceivedId;
+ private final static int udpFinalCheckResponsesSentId;
+ private final static int udpFinalCheckResponsesReceivedId;
static {
String statName = "DistributionStats";
@@ -317,6 +337,33 @@ public class DistributionStats implements DMStats {
final String initialImageMessagesInFlightDesc = "The number of messages with initial image data sent from this member that have not yet been acknowledged.";
final String initialImageRequestsInProgressDesc = "The number of initial images this member is currently receiving.";
+ //For GMSHealthMonitor
+ final String heartbeatRequestsSentDesc = "The number of heartbeat request messages that this member has sent.";
+ final String heartbeatRequestsReceivedDesc = "The number of heartbeat request messages that this member has received.";
+
+ final String heartbeatsSentDesc = "The number of heartbeat messages that this member has sent.";
+ final String heartbeatsReceivedDesc = "The number of heartbeat messages that this member has received.";
+
+ final String suspectsSentDesc = "The number of suspect member messages that this member has sent.";
+ final String suspectsReceivedDesc = "The number of suspect member messages that this member has received.";
+
+ final String finalCheckRequestsSentDesc = "The number of final check requests that this member has sent.";
+ final String finalCheckRequestsReceivedDesc = "The number of final check requests that this member has received.";
+
+ final String finalCheckResponsesSentDesc = "The number of final check responses that this member has sent.";
+ final String finalCheckResponsesReceivedDesc = "The number of final check responses that this member has received.";
+
+ final String tcpFinalCheckRequestsSentDesc = "The number of TCP final check requests that this member has sent.";
+ final String tcpFinalCheckRequestsReceivedDesc = "The number of TCP final check requests that this member has received.";
+
+ final String tcpFinalCheckResponsesSentDesc = "The number of TCP final check responses that this member has sent.";
+ final String tcpFinalCheckResponsesReceivedDesc = "The number of TCP final check responses that this member has received.";
+
+ final String udpFinalCheckRequestsSentDesc = "The number of UDP final checks that this member has sent.";
+ final String udpFinalCheckRequestsReceivedDesc = "The number of UDP final check requests that this member has received.";
+
+ final String udpFinalCheckResponsesSentDesc = "The number of UDP final check responses that this member has sent.";
+ final String udpFinalCheckResponsesReceivedDesc = "The number of UDP final check responses that this member has received.";
StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
@@ -486,6 +533,26 @@ public class DistributionStats implements DMStats {
f.createIntGauge("elders", eldersDesc, "elders"),
f.createIntGauge("initialImageMessagesInFlight", initialImageMessagesInFlightDesc, "messages"),
f.createIntGauge("initialImageRequestsInProgress", initialImageRequestsInProgressDesc, "requests"),
+
+ //For GMSHealthMonitor
+ f.createLongCounter("heartbeatRequestsSent", heartbeatRequestsSentDesc, "messages"),
+ f.createLongCounter("heartbeatRequestsReceived", heartbeatRequestsReceivedDesc, "messages"),
+ f.createLongCounter("heartbeatsSent", heartbeatsSentDesc, "messages"),
+ f.createLongCounter("heartbeatsReceived", heartbeatsReceivedDesc, "messages"),
+ f.createLongCounter("suspectsSent", suspectsSentDesc, "messages"),
+ f.createLongCounter("suspectsReceived", suspectsReceivedDesc, "messages"),
+ f.createLongCounter("finalCheckRequestsSent", finalCheckRequestsSentDesc, "messages"),
+ f.createLongCounter("finalCheckRequestsReceived", finalCheckRequestsReceivedDesc, "messages"),
+ f.createLongCounter("finalCheckResponsesSent", finalCheckResponsesSentDesc, "messages"),
+ f.createLongCounter("finalCheckResponsesReceived", finalCheckResponsesReceivedDesc, "messages"),
+ f.createLongCounter("tcpFinalCheckRequestsSent", tcpFinalCheckRequestsSentDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckRequestsReceived", tcpFinalCheckRequestsReceivedDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckResponsesSent", tcpFinalCheckResponsesSentDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckResponsesReceived", tcpFinalCheckResponsesReceivedDesc, "nanoseconds", false),
+ f.createLongCounter("udpFinalCheckRequestsSent", udpFinalCheckRequestsSentDesc, "messages"),
+ f.createLongCounter("udpFinalCheckRequestsReceived", udpFinalCheckRequestsReceivedDesc, "messages"),
+ f.createLongCounter("udpFinalCheckResponsesSent", udpFinalCheckResponsesSentDesc, "messages"),
+ f.createLongCounter("udpFinalCheckResponsesReceived", udpFinalCheckResponsesReceivedDesc, "messages"),
}
);
@@ -654,6 +721,26 @@ public class DistributionStats implements DMStats {
eldersId = type.nameToId("elders");
initialImageMessagesInFlightId = type.nameToId("initialImageMessagesInFlight");
initialImageRequestsInProgressId = type.nameToId("initialImageRequestsInProgress");
+
+ //For GMSHealthMonitor
+ heartbeatRequestsSentId = type.nameToId("heartbeatRequestsSent");
+ heartbeatRequestsReceivedId = type.nameToId("heartbeatRequestsReceived");
+ heartbeatsSentId = type.nameToId("heartbeatsSent");
+ heartbeatsReceivedId = type.nameToId("heartbeatsReceived");
+ suspectsSentId = type.nameToId("suspectsSent");
+ suspectsReceivedId = type.nameToId("suspectsReceived");
+ finalCheckRequestsSentId = type.nameToId("finalCheckRequestsSent");
+ finalCheckRequestsReceivedId = type.nameToId("finalCheckRequestsReceived");
+ finalCheckResponsesSentId = type.nameToId("finalCheckResponsesSent");
+ finalCheckResponsesReceivedId = type.nameToId("finalCheckResponsesReceived");
+ tcpFinalCheckRequestsSentId = type.nameToId("tcpFinalCheckRequestsSent");
+ tcpFinalCheckRequestsReceivedId = type.nameToId("tcpFinalCheckRequestsReceived");
+ tcpFinalCheckResponsesSentId = type.nameToId("tcpFinalCheckResponsesSent");
+ tcpFinalCheckResponsesReceivedId = type.nameToId("tcpFinalCheckResponsesReceived");
+ udpFinalCheckRequestsSentId = type.nameToId("udpFinalCheckRequestsSent");
+ udpFinalCheckRequestsReceivedId = type.nameToId("udpFinalCheckRequestsReceived");
+ udpFinalCheckResponsesSentId = type.nameToId("udpFinalCheckResponsesSent");
+ udpFinalCheckResponsesReceivedId = type.nameToId("udpFinalCheckResponsesReceived");
}
/** The Statistics object that we delegate most behavior to */
@@ -1811,4 +1898,154 @@ public class DistributionStats implements DMStats {
public Statistics getStats(){
return stats;
}
+
+ //For GMSHealthMonitor
+ public long getHeartbeatRequestsSent() {
+ return this.stats.getLong(heartbeatRequestsSentId);
+ }
+
+ public void incHeartbeatRequestsSent() {
+ this.stats.incLong(heartbeatRequestsSentId, 1L);
+ }
+
+ public long getHeartbeatRequestsReceived() {
+ return this.stats.getLong(heartbeatRequestsReceivedId);
+ }
+
+ public void incHeartbeatRequestsReceived() {
+ this.stats.incLong(heartbeatRequestsReceivedId, 1L);
+ }
+
+ public long getHeartbeatsSent() {
+ return this.stats.getLong(heartbeatsSentId);
+ }
+
+ public void incHeartbeatsSent() {
+ this.stats.incLong(heartbeatsSentId, 1L);
+ }
+
+ public long getHeartbeatsReceived() {
+ return this.stats.getLong(heartbeatsReceivedId);
+ }
+
+ public void incHeartbeatsReceived() {
+ this.stats.incLong(heartbeatsReceivedId, 1L);
+ }
+
+ public long getSuspectsSent() {
+ return this.stats.getLong(suspectsSentId);
+ }
+
+ public void incSuspectsSent() {
+ this.stats.incLong(suspectsSentId, 1L);
+ }
+
+ public long getSuspectsReceived() {
+ return this.stats.getLong(suspectsReceivedId);
+ }
+
+ public void incSuspectsReceived() {
+ this.stats.incLong(suspectsReceivedId, 1L);
+ }
+
+ public long getFinalCheckRequestsSent() {
+ return this.stats.getLong(finalCheckRequestsSentId);
+ }
+
+ public void incFinalCheckRequestsSent() {
+ this.stats.incLong(finalCheckRequestsSentId, 1L);
+ }
+
+ public long getFinalCheckRequestsReceived() {
+ return this.stats.getLong(finalCheckRequestsReceivedId);
+ }
+
+ public void incFinalCheckRequestsReceived() {
+ this.stats.incLong(finalCheckRequestsReceivedId, 1L);
+ }
+
+ public long getFinalCheckResponsesSent() {
+ return this.stats.getLong(finalCheckResponsesSentId);
+ }
+
+ public void incFinalCheckResponsesSent() {
+ this.stats.incLong(finalCheckResponsesSentId, 1L);
+ }
+
+ public long getFinalCheckResponsesReceived() {
+ return this.stats.getLong(finalCheckResponsesReceivedId);
+ }
+
+ public void incFinalCheckResponsesReceived() {
+ this.stats.incLong(finalCheckResponsesReceivedId, 1L);
+ }
+
+///
+ public long getTcpFinalCheckRequestsSent() {
+ return this.stats.getLong(tcpFinalCheckRequestsSentId);
+ }
+
+ public void incTcpFinalCheckRequestsSent() {
+ this.stats.incLong(tcpFinalCheckRequestsSentId, 1L);
+ }
+
+ public long getTcpFinalCheckRequestsReceived() {
+ return this.stats.getLong(tcpFinalCheckRequestsReceivedId);
+ }
+
+ public void incTcpFinalCheckRequestsReceived() {
+ this.stats.incLong(tcpFinalCheckRequestsReceivedId, 1L);
+ }
+
+ public long getTcpFinalCheckResponsesSent() {
+ return this.stats.getLong(tcpFinalCheckResponsesSentId);
+ }
+
+ public void incTcpFinalCheckResponsesSent() {
+ this.stats.incLong(tcpFinalCheckResponsesSentId, 1L);
+ }
+
+ public long getTcpFinalCheckResponsesReceived() {
+ return this.stats.getLong(tcpFinalCheckResponsesReceivedId);
+ }
+
+ public void incTcpFinalCheckResponsesReceived() {
+ this.stats.incLong(tcpFinalCheckResponsesReceivedId, 1L);
+ }
+
+///
+ public long getUdpFinalCheckRequestsSent() {
+ return this.stats.getLong(udpFinalCheckRequestsSentId);
+ }
+
+ public void incUdpFinalCheckRequestsSent() {
+ this.stats.incLong(udpFinalCheckRequestsSentId, 1L);
+ }
+
+// UDP final check is implemented using HeartbeatRequestMessage and HeartbeatMessage
+// So the following code is commented out
+// public long getUdpFinalCheckRequestsReceived() {
+// return this.stats.getLong(udpFinalCheckRequestsReceivedId);
+// }
+//
+// public void incUdpFinalCheckRequestsReceived() {
+// this.stats.incLong(udpFinalCheckRequestsReceivedId, 1L);
+// }
+//
+// public long getUdpFinalCheckResponsesSent() {
+// return this.stats.getLong(udpFinalCheckResponsesSentId);
+// }
+//
+// public void incUdpFinalCheckResponsesSent() {
+// this.stats.incLong(udpFinalCheckResponsesSentId, 1L);
+// }
+
+ public long getUdpFinalCheckResponsesReceived() {
+ return this.stats.getLong(udpFinalCheckResponsesReceivedId);
+ }
+
+ public void incUdpFinalCheckResponsesReceived() {
+ this.stats.incLong(udpFinalCheckResponsesReceivedId, 1L);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/91b43897/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
index 60158d1..b55fe88 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
@@ -573,6 +573,70 @@ public class LonerDistributionManager implements DM {
@Override
public void incThreadOwnedReceivers(long value, int dominoCount) {
}
+ @Override
+ public long getHeartbeatRequestsSent() {return 0;}
+ @Override
+ public void incHeartbeatRequestsSent() {}
+ @Override
+ public long getHeartbeatRequestsReceived() {return 0;}
+ @Override
+ public void incHeartbeatRequestsReceived() {}
+ @Override
+ public long getHeartbeatsSent() {return 0;}
+ @Override
+ public void incHeartbeatsSent() {}
+ @Override
+ public long getHeartbeatsReceived() {return 0;}
+ @Override
+ public void incHeartbeatsReceived() {}
+ @Override
+ public long getSuspectsSent() {return 0;}
+ @Override
+ public void incSuspectsSent() {}
+ @Override
+ public long getSuspectsReceived() {return 0;}
+ @Override
+ public void incSuspectsReceived() {}
+ @Override
+ public long getFinalCheckRequestsSent() {return 0;}
+ @Override
+ public void incFinalCheckRequestsSent() {}
+ @Override
+ public long getFinalCheckRequestsReceived() {return 0;}
+ @Override
+ public void incFinalCheckRequestsReceived() {}
+ @Override
+ public long getFinalCheckResponsesSent() {return 0;}
+ @Override
+ public void incFinalCheckResponsesSent() {}
+ @Override
+ public long getFinalCheckResponsesReceived() {return 0;}
+ @Override
+ public void incFinalCheckResponsesReceived() {}
+ @Override
+ public long getTcpFinalCheckRequestsSent() {return 0;}
+ @Override
+ public void incTcpFinalCheckRequestsSent() {}
+ @Override
+ public long getTcpFinalCheckRequestsReceived() {return 0;}
+ @Override
+ public void incTcpFinalCheckRequestsReceived() {}
+ @Override
+ public long getTcpFinalCheckResponsesSent() {return 0;}
+ @Override
+ public void incTcpFinalCheckResponsesSent() {}
+ @Override
+ public long getTcpFinalCheckResponsesReceived() {return 0;}
+ @Override
+ public void incTcpFinalCheckResponsesReceived() {}
+ @Override
+ public long getUdpFinalCheckRequestsSent() {return 0;}
+ @Override
+ public void incUdpFinalCheckRequestsSent() {}
+ @Override
+ public long getUdpFinalCheckResponsesReceived() {return 0;}
+ @Override
+ public void incUdpFinalCheckResponsesReceived() {}
}
protected static class DummyExecutor implements ExecutorService {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/91b43897/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 005b0ed..b6f6c12 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -32,7 +32,6 @@ import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -59,6 +58,7 @@ import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
@@ -139,10 +139,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
*/
final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>();
- /**
- * Members undergoing final checks
- */
- final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new ArrayList<>(30));
+// /**
+// * Members undergoing final checks
+// */
+// final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new ArrayList<>(30));
/**
* Replies to messages
@@ -175,6 +175,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
static final int ERROR = 0x00;
private volatile int socketPort;
private volatile ServerSocket serverSocket;
+
+ /** Statistics about health monitor */
+ protected DMStats stats;
/**
* this class is to avoid garbage
@@ -282,6 +285,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
int vmViewId = in.readInt();
long uuidLSBs = in.readLong();
long uuidMSBs = in.readLong();
+ GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
+ GMSHealthMonitor.this.stats.incTcpFinalCheckRequestsReceived();
boolean debug = logger.isDebugEnabled();
GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
UUID myUUID = gmbr.getUUID();
@@ -311,6 +316,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
out.write(OK);
out.flush();
socket.shutdownOutput();
+ GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
+ GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
if (debug) {
logger.debug("GMSHealthMonitor server socket replied OK.");
}
@@ -320,6 +327,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
out.write(ERROR);
out.flush();
socket.shutdownOutput();
+ GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
+ GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
if (debug) {
logger.debug("GMSHealthMonitor server socket replied ERROR.");
}
@@ -358,9 +367,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void contactedBy(InternalDistributedMember sender) {
TimeStamp cTS = new TimeStamp(currentTimeStamp);
cTS = memberTimeStamps.putIfAbsent(sender, cTS);
- if (cTS != null) {
- cTS.setTimeStamp(currentTimeStamp);
- }
+// if (cTS != null) {
+// cTS.setTimeStamp(currentTimeStamp);
+// }
if (suspectedMemberInView.remove(sender) != null) {
logger.info("No longer suspecting {}", sender);
}
@@ -369,10 +378,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) {
final int reqId = requestId.getAndIncrement();
- final HeartbeatRequestMessage prm = new HeartbeatRequestMessage(mbr, reqId);
- prm.setRecipient(mbr);
+ final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
+ hrm.setRecipient(mbr);
- return prm;
+ return hrm;
}
private void checkMember(final InternalDistributedMember mbr) {
@@ -424,11 +433,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return true;
}
logger.trace("Checking member {}", member);
- final HeartbeatRequestMessage prm = constructHeartbeatRequestMessage(member);
+ final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
final Response pingResp = new Response();
- requestIdVsResponse.put(prm.getRequestId(), pingResp);
+ requestIdVsResponse.put(hrm.getRequestId(), pingResp);
try {
- Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(prm);
+ Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
+ this.stats.incHeartbeatRequestsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", member);
@@ -446,6 +456,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return false;
} else {
logger.trace("received heartbeat from {}", member);
+ this.stats.incHeartbeatsReceived();
if (ts != null) {
ts.setTimeStamp(System.currentTimeMillis());
}
@@ -456,7 +467,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} catch (InterruptedException e) {
logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
} finally {
- requestIdVsResponse.remove(prm.getRequestId());
+ requestIdVsResponse.remove(hrm.getRequestId());
}
return false;
}
@@ -502,8 +513,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
writeMemberToStream(gmbr, out);
clientSocket.shutdownOutput();
+ this.stats.incFinalCheckRequestsSent();
+ this.stats.incTcpFinalCheckRequestsSent();
logger.debug("Connected - reading response from suspect member {}", suspectMember);
int b = in.read();
+ this.stats.incFinalCheckResponsesReceived();
+ this.stats.incTcpFinalCheckResponsesReceived();
logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
if (b == OK) {
TimeStamp ts = memberTimeStamps.get(suspectMember);
@@ -535,12 +550,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
out.flush();
}
- /*
- * (non-Javadoc)
- *
- * @see com.gemstone.gemfire.distributed.internal.membership.gms.fd.HealthMonitor#suspectMember(com.gemstone.gemfire.distributed.DistributedMember,
- * java.lang.String)
- */
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
initiateSuspicion(mbr, reason);
@@ -722,6 +731,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return;
}
services.getMessenger().sendUnreliably(message);
+ GMSHealthMonitor.this.stats.incHeartbeatsSent();
} catch (CancelException e) {
return;
}
@@ -748,6 +758,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
message.setRecipient(mbr);
try {
services.getMessenger().sendUnreliably(message);
+ GMSHealthMonitor.this.stats.incHeartbeatsSent();
numSent++;
if (numSent >= NUM_HEARTBEATS) {
break;
@@ -843,11 +854,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
isStopping = false;
services = s;
memberTimeout = s.getConfig().getMemberTimeout();
+ this.stats = services.getStatistics();
services.getMessenger().addHandler(HeartbeatRequestMessage.class, this);
services.getMessenger().addHandler(HeartbeatMessage.class, this);
services.getMessenger().addHandler(SuspectMembersMessage.class, this);
}
-
+
@Override
public void started() {
setLocalAddress( services.getMessenger().getMemberID());
@@ -993,6 +1005,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private void processHeartbeatRequest(HeartbeatRequestMessage m) {
+ this.stats.incHeartbeatRequestsReceived();
+
if (this.isStopping || this.playingDead) {
return;
}
@@ -1001,9 +1015,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
InternalDistributedMember me = localAddress;
if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
- HeartbeatMessage prm = new HeartbeatMessage(m.getRequestId());
- prm.setRecipient(m.getSender());
- Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(prm);
+ HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
+ hm.setRecipient(m.getSender());
+ Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm);
+ this.stats.incHeartbeatsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(m.getSender())) {
logger.debug("Unable to send heartbeat to member: {}", m.getSender());
}
@@ -1013,6 +1028,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
private void processHeartbeat(HeartbeatMessage m) {
+ this.stats.incHeartbeatsReceived();
if (m.getRequestId() < 0) {
// a periodic heartbeat
contactedBy(m.getSender());
@@ -1037,6 +1053,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param incomingRequest
*/
private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) {
+
+ this.stats.incSuspectsReceived();
+
NetView cv = currentView;
if (cv == null) {
@@ -1062,6 +1081,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
message.setRecipient(sender);
try {
services.getMessenger().send(message);
+ this.stats.incHeartbeatsSent();
it.remove();
} catch (CancelException e) {
return;
@@ -1132,13 +1152,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private void doFinalCheck(final InternalDistributedMember initiator,
List<SuspectRequest> sMembers, NetView cv, InternalDistributedMember localAddress) {
- List<InternalDistributedMember> membersChecked = new ArrayList<>(10);
+// List<InternalDistributedMember> membersChecked = new ArrayList<>(10);
try {
for (int i = 0; i < sMembers.size(); i++) {
final SuspectRequest sr = sMembers.get(i);
final InternalDistributedMember mbr = sr.getSuspectMember();
- if (!cv.contains(mbr) || membersInFinalCheck.contains(mbr)) {
+ if (!cv.contains(mbr) /*|| membersInFinalCheck.contains(mbr)*/) {
continue;
}
@@ -1146,7 +1166,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
continue;// self
}
- membersChecked.add(mbr);
+// membersChecked.add(mbr);
// suspectMemberInView is now set by the heartbeat monitoring code
// to allow us to move on from watching members we've already
@@ -1181,6 +1201,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.debug("\ncurrent view: {}\nports: {}", view, Arrays.toString(view.getFailureDetectionPorts()));
}
pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
+ GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
+ if (pinged) {
+ GMSHealthMonitor.this.stats.incFinalCheckResponsesReceived();
+ GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
+ }
} else {
pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
}
@@ -1214,7 +1240,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// }// scheduling for final check and removing it..
}
} finally {
- membersInFinalCheck.removeAll(membersChecked);
+// membersInFinalCheck.removeAll(membersChecked);
}
}
@Override
@@ -1253,10 +1279,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
recipients = currentView.getMembers();
// }
- SuspectMembersMessage rmm = new SuspectMembersMessage(recipients, requests);
+ SuspectMembersMessage smm = new SuspectMembersMessage(recipients, requests);
Set<InternalDistributedMember> failedRecipients;
try {
- failedRecipients = services.getMessenger().send(rmm);
+ failedRecipients = services.getMessenger().send(smm);
+ this.stats.incSuspectsSent();
} catch (CancelException e) {
return;
}
@@ -1299,4 +1326,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
+
+ public DMStats getStats() {
+ return this.stats;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/91b43897/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index d539374..82ac316 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -39,6 +39,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Properties;
import org.jgroups.util.UUID;
import org.junit.After;
@@ -49,8 +50,11 @@ import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
@@ -80,6 +84,7 @@ public class GMSHealthMonitorJUnitTest {
private JoinLeave joinLeave;
private GMSHealthMonitor gmsHealthMonitor;
private Manager manager;
+ private long statsId = 123;
final long memberTimeout = 1000l;
private int[] portRange= new int[]{0, 65535};
@@ -92,7 +97,20 @@ public class GMSHealthMonitorJUnitTest {
joinLeave = mock(JoinLeave.class);
manager = mock(Manager.class);
services = mock(Services.class);
- Stopper stopper = mock(Stopper.class);
+ Stopper stopper = mock(Stopper.class);
+
+ Properties nonDefault = new Properties();
+ nonDefault.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "1");
+ nonDefault.put(DistributionConfig.ACK_SEVERE_ALERT_THRESHOLD_NAME, "10");
+ nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true");
+ nonDefault.put(DistributionConfig.MCAST_PORT_NAME, "0");
+ nonDefault.put(DistributionConfig.MCAST_TTL_NAME, "0");
+ nonDefault.put(DistributionConfig.LOG_FILE_NAME, "");
+ nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
+ nonDefault.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "2000");
+ nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]");
+ DM dm = mock(DM.class);
+ InternalDistributedSystem system = InternalDistributedSystem.newInstanceForTesting(dm, nonDefault);
when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig);
when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout);
@@ -102,8 +120,8 @@ public class GMSHealthMonitorJUnitTest {
when(services.getJoinLeave()).thenReturn(joinLeave);
when(services.getCancelCriterion()).thenReturn(stopper);
when(services.getManager()).thenReturn(manager);
+ when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
when(stopper.isCancelInProgress()).thenReturn(false);
-
if (mockMembers == null) {
mockMembers = new ArrayList<InternalDistributedMember>();
@@ -142,6 +160,8 @@ public class GMSHealthMonitorJUnitTest {
gmsHealthMonitor.processMessage(new HeartbeatRequestMessage(mbr, 1));
verify(messenger, atLeastOnce()).send(any(HeartbeatMessage.class));
+ Assert.assertEquals(1, gmsHealthMonitor.getStats().getHeartbeatRequestsReceived());
+ Assert.assertEquals(1, gmsHealthMonitor.getStats().getHeartbeatsSent());
}
/**
@@ -187,7 +207,7 @@ public class GMSHealthMonitorJUnitTest {
// neighbor should change to 5th
System.out.println("testHMNextNeighborAfterTimeout ending");
Assert.assertEquals("expected " + expected + " but found " + neighbor
- + ". view="+v, expected, neighbor);
+ + ". view="+v, expected, neighbor);
}
/**
@@ -239,6 +259,8 @@ public class GMSHealthMonitorJUnitTest {
System.out.println("testSuspectMembersCalledThroughMemberCheckThread ending");
assertTrue(gmsHealthMonitor.isSuspectMember(mockMembers.get(4)));
+ Assert.assertTrue(gmsHealthMonitor.getStats().getHeartbeatRequestsSent() > 0);
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
}
/***
@@ -282,6 +304,8 @@ public class GMSHealthMonitorJUnitTest {
Thread.sleep(GMSHealthMonitor.MEMBER_SUSPECT_COLLECTION_INTERVAL + 1000);
verify(messenger, atLeastOnce()).send(any(SuspectMembersMessage.class));
+
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
}
/***
@@ -306,6 +330,8 @@ public class GMSHealthMonitorJUnitTest {
}
verify(messenger, atLeastOnce()).send(isA(SuspectMembersMessage.class));
+
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
}
/***
@@ -338,6 +364,7 @@ public class GMSHealthMonitorJUnitTest {
System.out.println("testRemoveMemberCalled ending");
verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsReceived() > 0);
}
/***
@@ -373,6 +400,7 @@ public class GMSHealthMonitorJUnitTest {
System.out.println("testRemoveMemberNotCalledBeforeTimeout ending");
verify(joinLeave, never()).remove(any(InternalDistributedMember.class), any(String.class));
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsReceived() > 0);
}
/***
@@ -407,6 +435,7 @@ public class GMSHealthMonitorJUnitTest {
Thread.sleep(memberTimeout + 200);
verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
+ Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsReceived() > 0);
}
/***
@@ -548,6 +577,9 @@ public class GMSHealthMonitorJUnitTest {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()));
int byteReply = dis.read();
Assert.assertEquals(expectedResult, byteReply);
+
+ Assert.assertTrue(gmsHealthMonitor.getStats().getFinalCheckResponsesSent() > 0);
+ Assert.assertTrue(gmsHealthMonitor.getStats().getTcpFinalCheckResponsesSent() > 0);
}
@Test
@@ -610,6 +642,10 @@ public class GMSHealthMonitorJUnitTest {
when(fakeSocket.isConnected()).thenReturn(true);
Assert.assertEquals(expectedResult, gmsHealthMonitor.doTCPCheckMember(otherMember, fakeSocket));
+ Assert.assertTrue(gmsHealthMonitor.getStats().getFinalCheckRequestsSent() > 0);
+ Assert.assertTrue(gmsHealthMonitor.getStats().getTcpFinalCheckRequestsSent() > 0);
+ Assert.assertTrue(gmsHealthMonitor.getStats().getFinalCheckResponsesReceived() > 0);
+ Assert.assertTrue(gmsHealthMonitor.getStats().getTcpFinalCheckResponsesReceived() > 0);
//we can check to see if the gms member information was written out by the tcp check
byte[] bytesWritten = outputStream.toByteArray();
[8/8] incubator-geode git commit: Merge remote-tracking branch
'origin/develop' into feature/GEODE-715
Posted by kl...@apache.org.
Merge remote-tracking branch 'origin/develop' into feature/GEODE-715
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b3851df5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b3851df5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b3851df5
Branch: refs/heads/feature/GEODE-715
Commit: b3851df511eb0fa37ffbe9d3f28114009f592ce6
Parents: 49754b3 ca6148a
Author: Kirk Lund <kl...@pivotal.io>
Authored: Mon Jan 4 09:04:14 2016 -0800
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Mon Jan 4 09:04:14 2016 -0800
----------------------------------------------------------------------
build.gradle | 21 +-
gemfire-assembly/build.gradle | 23 +
.../gemfire/distributed/internal/DMStats.java | 80 ++
.../distributed/internal/DistributionStats.java | 237 ++++
.../internal/LonerDistributionManager.java | 64 ++
.../membership/gms/fd/GMSHealthMonitor.java | 93 +-
.../gemfire/internal/cache/LocalRegion.java | 45 +-
.../internal/cache/PartitionedRegion.java | 75 --
.../test/java/com/gemstone/gemfire/BadTest.java | 42 -
.../cache30/Bug40255JUnitDisabledTest.java | 139 ---
.../gemfire/cache30/Bug40255JUnitTest.java | 143 +++
.../cache30/Bug40662JUnitDisabledTest.java | 92 --
.../gemfire/cache30/Bug40662JUnitTest.java | 93 ++
.../gemfire/cache30/SearchAndLoadDUnitTest.java | 177 ++-
.../locks/CollaborationJUnitDisabledTest.java | 562 ----------
.../internal/locks/CollaborationJUnitTest.java | 617 +++++++++++
.../gms/fd/GMSHealthMonitorJUnitTest.java | 42 +-
.../DiskRegionPerfJUnitPerformanceTest.java | 6 +-
...HARegionQueueStartStopJUnitDisabledTest.java | 123 ---
.../cache/tier/sockets/HAInterestBaseTest.java | 1015 -----------------
.../tier/sockets/HAInterestPart1DUnitTest.java | 102 +-
.../tier/sockets/HAInterestPart2DUnitTest.java | 116 +-
.../cache/tier/sockets/HAInterestTestCase.java | 1018 ++++++++++++++++++
.../tier/sockets/command/CommitCommandTest.java | 6 +
.../logging/LogWriterPerformanceTest.java | 6 +
.../log4j/Log4J2DisabledPerformanceTest.java | 6 +
.../logging/log4j/Log4J2PerformanceTest.java | 9 +-
.../LogWriterLoggerDisabledPerformanceTest.java | 6 +
.../log4j/LogWriterLoggerPerformanceTest.java | 9 +-
.../internal/process/PidFileJUnitTest.java | 2 +-
.../test/java/dunit/DistributedTestCase.java | 3 +
...IndexRepositoryImplJUnitPerformanceTest.java | 437 --------
.../IndexRepositoryImplPerformanceTest.java | 439 ++++++++
33 files changed, 3178 insertions(+), 2670 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b3851df5/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/test/java/dunit/DistributedTestCase.java
index 7aa8e6f,6fa560f..6af972a
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@@ -86,8 -87,10 +87,9 @@@ import com.gemstone.gemfire.internal.lo
import com.gemstone.gemfire.internal.logging.ManagerLogWriter;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterLogger;
import com.gemstone.gemfire.management.internal.cli.LogWrapper;
+import com.gemstone.gemfire.test.dunit.standalone.DUnitLauncher;
+ import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-import dunit.standalone.DUnitLauncher;
-
/**
* This class is the superclass of all distributed unit tests.
*
[4/8] incubator-geode git commit: GEODE-627: Add source distribution
Posted by kl...@apache.org.
GEODE-627: Add source distribution
Add another distribution to create source archives, without gradle
wrapper binaries included.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bba76560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bba76560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bba76560
Branch: refs/heads/feature/GEODE-715
Commit: bba7656013d4ca5b5f97de24f2957049273bef09
Parents: 91b4389
Author: Anthony Baker <ab...@pivotal.io>
Authored: Wed Dec 30 09:01:17 2015 -0600
Committer: Anthony Baker <ab...@pivotal.io>
Committed: Thu Dec 31 06:21:34 2015 -0600
----------------------------------------------------------------------
gemfire-assembly/build.gradle | 23 +++++++++++++++++++++++
1 file changed, 23 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bba76560/gemfire-assembly/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-assembly/build.gradle b/gemfire-assembly/build.gradle
index 373d77a..ad466de 100755
--- a/gemfire-assembly/build.gradle
+++ b/gemfire-assembly/build.gradle
@@ -178,6 +178,29 @@ task gfshDepsJar (type: Jar, dependsOn: ':gemfire-core:classes') {
}
distributions {
+ src {
+ baseName = 'apache-geode-src'
+ contents {
+ from (rootDir) {
+ exclude 'gradlew'
+ exclude 'gradlew.bat'
+ exclude 'gradle/wrapper/gradle-wrapper.jar'
+ exclude 'gradle/wrapper/gradle-wrapper.properties'
+ exclude '.gradle'
+ exclude '**/build/**'
+ exclude '**/.project'
+ exclude '**/.classpath'
+ exclude '**/.settings/**'
+ exclude '**/build-eclipse/**'
+ exclude '.idea/**'
+ exclude '**/*.iml'
+ exclude '**/*.ipr'
+ exclude '**/*.iws'
+ exclude '**/tags'
+ }
+ }
+ }
+
main {
baseName = 'apache-geode' //TODO rootProject.name
contents {
[5/8] incubator-geode git commit: GEODE-714: Modify all tests to use
JUnit Categories
Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
new file mode 100755
index 0000000..481863c
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -0,0 +1,1018 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.MirrorType;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
+import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests Interest Registration Functionality
+ */
+@SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"})
+public class HAInterestTestCase extends DistributedTestCase {
+
+ protected static final int TIMEOUT_MILLIS = 60 * 1000;
+ protected static final int INTERVAL_MILLIS = 10;
+
+ protected static final String REGION_NAME = "HAInterestBaseTest_region";
+
+ protected static final String k1 = "k1";
+ protected static final String k2 = "k2";
+ protected static final String client_k1 = "client-k1";
+ protected static final String client_k2 = "client-k2";
+ protected static final String server_k1 = "server-k1";
+ protected static final String server_k2 = "server-k2";
+ protected static final String server_k1_updated = "server_k1_updated";
+
+ protected static Cache cache = null;
+ protected static PoolImpl pool = null;
+ protected static Connection conn = null;
+
+ protected static int PORT1;
+ protected static int PORT2;
+ protected static int PORT3;
+
+ protected static boolean isBeforeRegistrationCallbackCalled = false;
+ protected static boolean isBeforeInterestRecoveryCallbackCalled = false;
+ protected static boolean isAfterRegistrationCallbackCalled = false;
+
+ protected static Host host = null;
+ protected static VM server1 = null;
+ protected static VM server2 = null;
+ protected static VM server3 = null;
+
+ protected volatile static boolean exceptionOccured = false;
+
+ public HAInterestTestCase(String name) {
+ super(name);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ host = Host.getHost(0);
+ server1 = host.getVM(0);
+ server2 = host.getVM(1);
+ server3 = host.getVM(2);
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ // start servers first
+ PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ PORT2 = ((Integer) server2.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ PORT3 = ((Integer) server3.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ exceptionOccured = false;
+ addExpectedException("java.net.ConnectException: Connection refused: connect");
+ }
+
+ @Override
+ public void tearDown2() throws Exception {
+ // close the clients first
+ closeCache();
+
+ // then close the servers
+ server1.invoke(HAInterestTestCase.class, "closeCache");
+ server2.invoke(HAInterestTestCase.class, "closeCache");
+ server3.invoke(HAInterestTestCase.class, "closeCache");
+ CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
+ }
+
+ public static void closeCache() {
+ PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+ PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+ PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+ PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+ HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
+ HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = false;
+ HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache.getDistributedSystem().disconnect();
+ }
+ cache = null;
+ pool = null;
+ conn = null;
+ }
+
+ /**
+ * Return the current primary waiting for a primary to exist.
+ *
+ * @since 5.7
+ */
+ public static VM getPrimaryVM() {
+ return getPrimaryVM(null);
+ }
+
+ /**
+ * Return the current primary waiting for a primary to exist and for it not to
+ * be the oldPrimary (if oldPrimary is NOT null).
+ *
+ * @since 5.7
+ */
+ public static VM getPrimaryVM(final VM oldPrimary) {
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ int primaryPort = pool.getPrimaryPort();
+ if (primaryPort == -1) {
+ return false;
+ }
+ // we have a primary
+ VM currentPrimary = getServerVM(primaryPort);
+ if (currentPrimary != oldPrimary) {
+ return true;
+ }
+ return false;
+ }
+ @Override
+ public String description() {
+ return "waiting for primary";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ int primaryPort = pool.getPrimaryPort();
+ assertTrue(primaryPort != -1);
+ VM currentPrimary = getServerVM(primaryPort);
+ assertTrue(currentPrimary != oldPrimary);
+ return currentPrimary;
+ }
+
+ public static VM getBackupVM() {
+ return getBackupVM(null);
+ }
+
+ public static VM getBackupVM(VM stoppedBackup) {
+ VM currentPrimary = getPrimaryVM(null);
+ if (currentPrimary != server2 && server2 != stoppedBackup) {
+ return server2;
+ } else if (currentPrimary != server3 && server3 != stoppedBackup) {
+ return server3;
+ } else if (currentPrimary != server1 && server1 != stoppedBackup) {
+ return server1;
+ } else {
+ fail("expected currentPrimary " + currentPrimary + " to be " + server1 + ", or " + server2 + ", or " + server3);
+ return null;
+ }
+ }
+
+ /**
+ * Given a server vm (server1, server2, or server3) return its port.
+ *
+ * @since 5.7
+ */
+ public static int getServerPort(VM vm) {
+ if (vm == server1) {
+ return PORT1;
+ } else if (vm == server2) {
+ return PORT2;
+ } else if (vm == server3) {
+ return PORT3;
+ } else {
+ fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", or " + server3);
+ return -1;
+ }
+ }
+
+ /**
+ * Given a server port (PORT1, PORT2, or PORT3) return its vm.
+ *
+ * @since 5.7
+ */
+ public static VM getServerVM(int port) {
+ if (port == PORT1) {
+ return server1;
+ } else if (port == PORT2) {
+ return server2;
+ } else if (port == PORT3) {
+ return server3;
+ } else {
+ fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", or " + PORT3);
+ return null;
+ }
+ }
+
+ public static void verifyRefreshedEntriesFromServer() {
+ final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ Region.Entry re = r1.getEntry(k1);
+ if (re == null)
+ return false;
+ Object val = re.getValue();
+ return client_k1.equals(val);
+ }
+ @Override
+ public String description() {
+ return "waiting for client_k1 refresh from server";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ Region.Entry re = r1.getEntry(k2);
+ if (re == null)
+ return false;
+ Object val = re.getValue();
+ return client_k2.equals(val);
+ }
+ @Override
+ public String description() {
+ return "waiting for client_k2 refresh from server";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+ }
+
+ public static void verifyDeadAndLiveServers(final int expectedDeadServers, final int expectedLiveServers) {
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == expectedLiveServers;
+ }
+ @Override
+ public String description() {
+ return "waiting for pool.getConnectedServerCount() == expectedLiveServer";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+ }
+
+ public static void putK1andK2() {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+ r1.put(k1, server_k1);
+ r1.put(k2, server_k2);
+ }
+
+ public static void setClientServerObserverForBeforeInterestRecoveryFailure() {
+ PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
+ ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+ public void beforeInterestRecovery() {
+ synchronized (HAInterestTestCase.class) {
+ Thread t = new Thread() {
+ public void run() {
+ getBackupVM().invoke(HAInterestTestCase.class, "startServer");
+ getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+ }
+ };
+ t.start();
+ try {
+ DistributedTestCase.join(t, 30 * 1000, getLogWriter());
+ } catch (Exception ignore) {
+ exceptionOccured = true;
+ }
+ HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
+ HAInterestTestCase.class.notify();
+ PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+ }
+ }
+ });
+ }
+
+ public static void setClientServerObserverForBeforeInterestRecovery() {
+ PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
+ ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+ public void beforeInterestRecovery() {
+ synchronized (HAInterestTestCase.class) {
+ Thread t = new Thread() {
+ public void run() {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+ r1.put(k1, server_k1_updated);
+ }
+ };
+ t.start();
+
+ HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
+ HAInterestTestCase.class.notify();
+ PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+ }
+ }
+ });
+ }
+
+ public static void waitForBeforeInterestRecoveryCallBack() throws InterruptedException {
+ assertNotNull(cache);
+ synchronized (HAInterestTestCase.class) {
+ while (!isBeforeInterestRecoveryCallbackCalled) {
+ HAInterestTestCase.class.wait();
+ }
+ }
+ }
+
+ public static void setClientServerObserverForBeforeRegistration(final VM vm) {
+ PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true;
+ ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+ public void beforeInterestRegistration() {
+ synchronized (HAInterestTestCase.class) {
+ vm.invoke(HAInterestTestCase.class, "startServer");
+ HAInterestTestCase.isBeforeRegistrationCallbackCalled = true;
+ HAInterestTestCase.class.notify();
+ PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+ }
+ }
+ });
+ }
+
+ public static void waitForBeforeRegistrationCallback() throws InterruptedException {
+ assertNotNull(cache);
+ synchronized (HAInterestTestCase.class) {
+ while (!isBeforeRegistrationCallbackCalled) {
+ HAInterestTestCase.class.wait();
+ }
+ }
+ }
+
+ public static void setClientServerObserverForAfterRegistration(final VM vm) {
+ PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true;
+ ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+ public void afterInterestRegistration() {
+ synchronized (HAInterestTestCase.class) {
+ vm.invoke(HAInterestTestCase.class, "startServer");
+ HAInterestTestCase.isAfterRegistrationCallbackCalled = true;
+ HAInterestTestCase.class.notify();
+ PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+ }
+ }
+ });
+ }
+
+ public static void waitForAfterRegistrationCallback() throws InterruptedException {
+ assertNotNull(cache);
+ if (!isAfterRegistrationCallbackCalled) {
+ synchronized (HAInterestTestCase.class) {
+ while (!isAfterRegistrationCallbackCalled) {
+ HAInterestTestCase.class.wait();
+ }
+ }
+ }
+ }
+
+ public static void unSetClientServerObserverForRegistrationCallback() {
+ synchronized (HAInterestTestCase.class) {
+ PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+ PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+ HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
+ HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
+ }
+ }
+
+ public static void verifyDispatcherIsAlive() {
+ assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return cache.getCacheServers().size() == 1;
+ }
+ @Override
+ public String description() {
+ return "waiting for cache.getCacheServers().size() == 1";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ assertNotNull(bs.getAcceptor());
+ assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+ final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return ccn.getClientProxies().size() > 0;
+ }
+ @Override
+ public String description() {
+ return "waiting for ccn.getClientProxies().size() > 0";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ wc = new WaitCriterion() {
+ Iterator iter_prox;
+ CacheClientProxy proxy;
+
+ @Override
+ public boolean done() {
+ iter_prox = ccn.getClientProxies().iterator();
+ if (iter_prox.hasNext()) {
+ proxy = (CacheClientProxy) iter_prox.next();
+ return proxy._messageDispatcher.isAlive();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String description() {
+ return "waiting for CacheClientProxy _messageDispatcher to be alive";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+ }
+
+ public static void verifyDispatcherIsNotAlive() {
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return cache.getCacheServers().size() == 1;
+ }
+ @Override
+ public String description() {
+ return "cache.getCacheServers().size() == 1";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ assertNotNull(bs.getAcceptor());
+ assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+ final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return ccn.getClientProxies().size() > 0;
+ }
+ @Override
+ public String description() {
+ return "waiting for ccn.getClientProxies().size() > 0";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ Iterator iter_prox = ccn.getClientProxies().iterator();
+ if (iter_prox.hasNext()) {
+ CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+ assertFalse("Dispatcher on secondary should not be alive", proxy._messageDispatcher.isAlive());
+ }
+ }
+
+ public static void createEntriesK1andK2OnServer() {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+ if (!r1.containsKey(k1)) {
+ r1.create(k1, server_k1);
+ }
+ if (!r1.containsKey(k2)) {
+ r1.create(k2, server_k2);
+ }
+ assertEquals(r1.getEntry(k1).getValue(), server_k1);
+ assertEquals(r1.getEntry(k2).getValue(), server_k2);
+ }
+
+ public static void createEntriesK1andK2() {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+ if (!r1.containsKey(k1)) {
+ r1.create(k1, client_k1);
+ }
+ if (!r1.containsKey(k2)) {
+ r1.create(k2, client_k2);
+ }
+ assertEquals(r1.getEntry(k1).getValue(), client_k1);
+ assertEquals(r1.getEntry(k2).getValue(), client_k2);
+ }
+
+ public static void createServerEntriesK1andK2() {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+ if (!r1.containsKey(k1)) {
+ r1.create(k1, server_k1);
+ }
+ if (!r1.containsKey(k2)) {
+ r1.create(k2, server_k2);
+ }
+ assertEquals(r1.getEntry(k1).getValue(), server_k1);
+ assertEquals(r1.getEntry(k2).getValue(), server_k2);
+ }
+
+ public static void registerK1AndK2() {
+ Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+ r.registerInterest(list, InterestResultPolicy.KEYS_VALUES);
+ }
+
+ public static void reRegisterK1AndK2() {
+ Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+ r.registerInterest(list);
+ }
+
+ public static void startServer() throws IOException {
+ Cache c = CacheFactory.getAnyInstance();
+ assertEquals("More than one BridgeServer", 1, c.getCacheServers().size());
+ CacheServerImpl bs = (CacheServerImpl) c.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ bs.start();
+ }
+
+ public static void stopServer() {
+ assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
+ CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ bs.stop();
+ }
+
+ public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() {
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == 3;
+ }
+ @Override
+ public String description() {
+ return "connected server count never became 3";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ // close primaryEP
+ getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+ List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+ assertNotNull(serverKeys);
+ List resultKeys = (List) serverKeys.get(0);
+ assertEquals(2, resultKeys.size());
+ assertTrue(resultKeys.contains(k1));
+ assertTrue(resultKeys.contains(k2));
+ }
+
+ public static void stopPrimaryAndUnregisterRegisterK1() {
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == 3;
+ }
+ @Override
+ public String description() {
+ return "connected server count never became 3";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ // close primaryEP
+ getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+ List list = new ArrayList();
+ list.add(k1);
+ srp.unregisterInterest(list, InterestType.KEY, false, false);
+ }
+
+ public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() {
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == 3;
+ }
+ @Override
+ public String description() {
+ return "connected server count never became 3";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ // close primaryEP
+ VM backup = getBackupVM();
+ getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+ // close secondary
+ backup.invoke(HAInterestTestCase.class, "stopServer");
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+ List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+
+ assertNotNull(serverKeys);
+ List resultKeys = (List) serverKeys.get(0);
+ assertEquals(2, resultKeys.size());
+ assertTrue(resultKeys.contains(k1));
+ assertTrue(resultKeys.contains(k2));
+ }
+
+ /**
+ * returns the secondary that was stopped
+ */
+ public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() {
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == 3;
+ }
+ @Override
+ public String description() {
+ return "Never got three connected servers";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ // close secondary EP
+ VM result = getBackupVM();
+ result.invoke(HAInterestTestCase.class, "stopServer");
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+ List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+
+ assertNotNull(serverKeys);
+ List resultKeys = (List) serverKeys.get(0);
+ assertEquals(2, resultKeys.size());
+ assertTrue(resultKeys.contains(k1));
+ assertTrue(resultKeys.contains(k2));
+ return result;
+ }
+
+ /**
+ * returns the secondary that was stopped
+ */
+ public static VM stopSecondaryAndUNregisterK1() {
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return pool.getConnectedServerCount() == 3;
+ }
+ @Override
+ public String description() {
+ return "connected server count never became 3";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ // close secondary EP
+ VM result = getBackupVM();
+ result.invoke(HAInterestTestCase.class, "stopServer");
+ List list = new ArrayList();
+ list.add(k1);
+ srp.unregisterInterest(list, InterestType.KEY, false, false);
+ return result;
+ }
+
+ public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() {
+ ServerLocation primary = pool.getPrimary();
+ ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0);
+ LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r);
+ ServerRegionProxy srp = new ServerRegionProxy(r);
+ List list = new ArrayList();
+ list.add(k1);
+ list.add(k2);
+
+ // Primary server
+ List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+ assertNotNull(serverKeys1);
+ // expect serverKeys in response from primary
+ List resultKeys = (List) serverKeys1.get(0);
+ assertEquals(2, resultKeys.size());
+ assertTrue(resultKeys.contains(k1));
+ assertTrue(resultKeys.contains(k2));
+
+ // Secondary server
+ List serverKeys2 = srp.registerInterestOn(secondary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+ // if the list is null then it is empty
+ if (serverKeys2 != null) {
+ // no serverKeys in response from secondary
+ assertTrue(serverKeys2.isEmpty());
+ }
+ }
+
+ public static void verifyInterestRegistration() {
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return cache.getCacheServers().size() == 1;
+ }
+ @Override
+ public String description() {
+ return "waiting for cache.getCacheServers().size() == 1";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ assertNotNull(bs.getAcceptor());
+ assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+ final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return ccn.getClientProxies().size() > 0;
+ }
+ @Override
+ public String description() {
+ return "waiting for ccn.getClientProxies().size() > 0";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ Iterator iter_prox = ccn.getClientProxies().iterator();
+
+ if (iter_prox.hasNext()) {
+ final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
+ return keysMap != null && keysMap.size() == 2;
+ }
+ @Override
+ public String description() {
+ return "waiting for keys of interest to include 2 keys";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex].getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
+ assertNotNull(keysMap);
+ assertEquals(2, keysMap.size());
+ assertTrue(keysMap.contains(k1));
+ assertTrue(keysMap.contains(k2));
+ }
+ }
+
+ public static void verifyInterestUNRegistration() {
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return cache.getCacheServers().size() == 1;
+ }
+ @Override
+ public String description() {
+ return "waiting for cache.getCacheServers().size() == 1";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
+ assertNotNull(bs);
+ assertNotNull(bs.getAcceptor());
+ assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+ final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return ccn.getClientProxies().size() > 0;
+ }
+ @Override
+ public String description() {
+ return "waiting for ccn.getClientProxies().size() > 0";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ Iterator iter_prox = ccn.getClientProxies().iterator();
+ if (iter_prox.hasNext()) {
+ final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
+
+ wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
+ return keysMap != null;
+ }
+ @Override
+ public String description() {
+ return "waiting for keys of interest to not be null";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
+
+ Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
+ assertNotNull(keysMap);
+ assertEquals(1, keysMap.size());
+ assertFalse(keysMap.contains(k1));
+ assertTrue(keysMap.contains(k2));
+ }
+ }
+
+ private void createCache(Properties props) throws Exception {
+ DistributedSystem ds = getSystem(props);
+ assertNotNull(ds);
+ ds.disconnect();
+ ds = getSystem(props);
+ cache = CacheFactory.create(ds);
+ assertNotNull(cache);
+ }
+
+ public static void createClientPoolCache(String testName, String host) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ new HAInterestTestCase("temp").createCache(props);
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl p;
+ try {
+ p = (PoolImpl) PoolManager.createFactory()
+ .addServer(host, PORT1)
+ .addServer(host, PORT2)
+ .addServer(host, PORT3)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1)
+ .setReadTimeout(1000)
+ .setPingInterval(1000)
+ // retryInterval should be more so that only registerInterste thread
+ // will initiate failover
+ // .setRetryInterval(20000)
+ .create("HAInterestBaseTestPool");
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(true);
+ factory.setPoolName(p.getName());
+
+ cache.createRegion(REGION_NAME, factory.create());
+ pool = p;
+ conn = pool.acquireConnection();
+ assertNotNull(conn);
+ }
+
+ public static void createClientPoolCacheWithSmallRetryInterval(String testName, String host) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ new HAInterestTestCase("temp").createCache(props);
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl p;
+ try {
+ p = (PoolImpl) PoolManager.createFactory()
+ .addServer(host, PORT1)
+ .addServer(host, PORT2)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1)
+ .setReadTimeout(1000)
+ .setSocketBufferSize(32768)
+ .setMinConnections(6)
+ .setPingInterval(200)
+ // .setRetryInterval(200)
+ // retryAttempts 3
+ .create("HAInterestBaseTestPool");
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(true);
+ factory.setPoolName(p.getName());
+
+ cache.createRegion(REGION_NAME, factory.create());
+
+ pool = p;
+ conn = pool.acquireConnection();
+ assertNotNull(conn);
+ }
+
+ public static void createClientPoolCacheConnectionToSingleServer(String testName, String hostName) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ new HAInterestTestCase("temp").createCache(props);
+ PoolImpl p = (PoolImpl) PoolManager.createFactory()
+ .addServer(hostName, PORT1)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1)
+ .setReadTimeout(1000)
+ // .setRetryInterval(20)
+ .create("HAInterestBaseTestPool");
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(true);
+ factory.setPoolName(p.getName());
+
+ cache.createRegion(REGION_NAME, factory.create());
+
+ pool = p;
+ conn = pool.acquireConnection();
+ assertNotNull(conn);
+ }
+
+ public static Integer createServerCache() throws Exception {
+ new HAInterestTestCase("temp").createCache(new Properties());
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEnableBridgeConflation(true);
+ factory.setMirrorType(MirrorType.KEYS_VALUES);
+ factory.setConcurrencyChecksEnabled(true);
+ cache.createRegion(REGION_NAME, factory.create());
+
+ CacheServer server = cache.addCacheServer();
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ server.setPort(port);
+ server.setMaximumTimeBetweenPings(180000);
+ // ensures updates to be sent instead of invalidations
+ server.setNotifyBySubscription(true);
+ server.start();
+ return new Integer(server.getPort());
+ }
+
+ public static Integer createServerCacheWithLocalRegion() throws Exception {
+ new HAInterestTestCase("temp").createCache(new Properties());
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(true);
+ RegionAttributes attrs = factory.create();
+ cache.createRegion(REGION_NAME, attrs);
+
+ CacheServer server = cache.addCacheServer();
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ server.setPort(port);
+ // ensures updates to be sent instead of invalidations
+ server.setNotifyBySubscription(true);
+ server.setMaximumTimeBetweenPings(180000);
+ server.start();
+ return new Integer(server.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
index b12f55b..b6bfe22 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -22,12 +22,18 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+/**
+ * Exposes GEODE-537: NPE in JTA AFTER_COMPLETION command processing
+ */
+@Category(UnitTest.class)
public class CommitCommandTest {
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
index 77d7995..1f72a6b 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
@@ -21,16 +21,22 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
/**
* Tests performance of logging when level is OFF.
*
* @author Kirk Lund
*/
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class LogWriterPerformanceTest extends LoggingPerformanceTestCase {
public LogWriterPerformanceTest(String name) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
index f98868b..caedadc 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
@@ -20,7 +20,13 @@ import java.io.IOException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class Log4J2DisabledPerformanceTest extends Log4J2PerformanceTest {
public Log4J2DisabledPerformanceTest(String name) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
index a002389..ddf106d 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
@@ -29,12 +29,17 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.ConfigurationFactory;
import org.apache.logging.log4j.util.PropertiesUtil;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.internal.FileUtil;
import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase;
import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
import com.gemstone.org.apache.logging.log4j.core.config.xml.GemFireXmlConfigurationFactory;
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class Log4J2PerformanceTest extends LoggingPerformanceTestCase {
protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE;
@@ -44,13 +49,11 @@ public class Log4J2PerformanceTest extends LoggingPerformanceTestCase {
protected static final String SYS_LOG_FILE_SIZE_LIMIT = "gemfire-log-file-size-limit";
protected static final String SYS_LOG_FILE_COUNT_LIMIT = "gemfire-log-file-count-limit";
- static {
+ private static void init() { // was a static initializer
// set log4j.configurationFactory to be our optimized version
final String factory = GemFireXmlConfigurationFactory.class.getName();
System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY, factory);
- System.out.println("Set "+ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY+" to "+factory);
final String factoryClass = PropertiesUtil.getProperties().getStringProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
- System.out.println("KIRK: factoryClass is " + factoryClass);
}
private File config = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
index f964208..4be34c7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
@@ -20,7 +20,13 @@ import java.io.IOException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class LogWriterLoggerDisabledPerformanceTest extends LogWriterLoggerPerformanceTest {
public LogWriterLoggerDisabledPerformanceTest(String name) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
index 61b5131..926bc75 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
@@ -28,12 +28,17 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.ConfigurationFactory;
import org.apache.logging.log4j.util.PropertiesUtil;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.internal.FileUtil;
import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase;
import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
import com.gemstone.org.apache.logging.log4j.core.config.xml.GemFireXmlConfigurationFactory;
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class LogWriterLoggerPerformanceTest extends LoggingPerformanceTestCase {
protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE;
@@ -43,13 +48,11 @@ public class LogWriterLoggerPerformanceTest extends LoggingPerformanceTestCase {
protected static final String SYS_LOG_FILE_SIZE_LIMIT = "gemfire-log-file-size-limit";
protected static final String SYS_LOG_FILE_COUNT_LIMIT = "gemfire-log-file-count-limit";
- static {
+ private static void init() { // was a static initializer
// set log4j.configurationFactory to be our optimized version
final String factory = GemFireXmlConfigurationFactory.class.getName();
System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY, factory);
- System.out.println("Set "+ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY+" to "+factory);
final String factoryClass = PropertiesUtil.getProperties().getStringProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
- System.out.println("KIRK: factoryClass is " + factoryClass);
}
private File config = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/DistributedTestCase.java b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
index a3d4785..6fa560f 100755
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.junit.experimental.categories.Category;
import org.springframework.data.gemfire.support.GemfireCache;
import junit.framework.TestCase;
@@ -86,6 +87,7 @@ import com.gemstone.gemfire.internal.logging.LogWriterImpl;
import com.gemstone.gemfire.internal.logging.ManagerLogWriter;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterLogger;
import com.gemstone.gemfire.management.internal.cli.LogWrapper;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
import dunit.standalone.DUnitLauncher;
@@ -101,6 +103,7 @@ import dunit.standalone.DUnitLauncher;
*
* @author David Whitlock
*/
+@Category(DistributedTest.class)
@SuppressWarnings("serial")
public abstract class DistributedTestCase extends TestCase implements java.io.Serializable {
private static final Logger logger = LogService.getLogger();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
deleted file mode 100644
index ab2db78..0000000
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.gemstone.gemfire.cache.lucene.internal.repository;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.SearcherManager;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.RAMDirectory;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.DataSerializable;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.lucene.LuceneIndex;
-import com.gemstone.gemfire.cache.lucene.LuceneQuery;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.LuceneService;
-import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
-import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
-import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
-import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
-import com.gemstone.gemfire.cache.query.QueryException;
-import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
-
-/**
- * Microbenchmark of the IndexRepository to compare an
- * IndexRepository built on top of cache with a
- * stock lucene IndexWriter with a RAMDirectory.
- */
-@Category(PerformanceTest.class)
-public class IndexRepositoryImplJUnitPerformanceTest {
-
- private static final int NUM_WORDS = 1000;
- private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000};
- private static int NUM_ENTRIES = 500_000;
- private static int NUM_QUERIES = 500_000;
-
- private StandardAnalyzer analyzer = new StandardAnalyzer();
-
- @Test
- public void testIndexRepository() throws Exception {
-
-
- doTest("IndexRepository", new TestCallbacks() {
-
- private Cache cache;
- private IndexRepositoryImpl repo;
- private IndexWriter writer;
-
- @Override
- public void addObject(String key, String text) throws Exception {
- repo.create(key, new TestObject(text));
- }
-
- @Override
- public void commit() throws Exception {
- repo.commit();
- }
-
- @Override
- public void init() throws Exception {
- cache = new CacheFactory().set("mcast-port", "0")
- .set("log-level", "error")
- .create();
- Region<String, File> fileRegion = cache.<String, File>createRegionFactory(RegionShortcut.REPLICATE).create("files");
- Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks");
-
- RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion);
-
-
- IndexWriterConfig config = new IndexWriterConfig(analyzer);
- writer = new IndexWriter(dir, config);
- String[] indexedFields= new String[] {"text"};
- HeterogenousLuceneSerializer mapper = new HeterogenousLuceneSerializer(indexedFields);
- repo = new IndexRepositoryImpl(fileRegion, writer, mapper);
- }
-
- @Override
- public void cleanup() throws IOException {
- writer.close();
- cache.close();
- }
-
- @Override
- public void waitForAsync() throws Exception {
- //do nothing
- }
-
- @Override
- public int query(Query query) throws IOException {
- TopEntriesCollector collector = new TopEntriesCollector();
- repo.query(query, 100, collector);
- return collector.size();
- }
- });
- }
-
- /**
- * Test our full lucene index implementation
- * @throws Exception
- */
- @Test
- public void testLuceneIndex() throws Exception {
-
-
- doTest("LuceneIndex", new TestCallbacks() {
-
- private Cache cache;
- private Region<String, TestObject> region;
- private LuceneService service;
-
- @Override
- public void addObject(String key, String text) throws Exception {
- region.create(key, new TestObject(text));
- }
-
- @Override
- public void commit() throws Exception {
- //NA
- }
-
- @Override
- public void init() throws Exception {
- cache = new CacheFactory().set("mcast-port", "0")
- .set("log-level", "warning")
- .create();
- service = LuceneServiceProvider.get(cache);
- service.createIndex("index", "/region", "text");
- region = cache.<String, TestObject>createRegionFactory(RegionShortcut.PARTITION)
- .setPartitionAttributes(new PartitionAttributesFactory<>().setTotalNumBuckets(1).create())
- .create("region");
- }
-
- @Override
- public void cleanup() throws IOException {
- cache.close();
- }
-
- @Override
- public void waitForAsync() throws Exception {
- AsyncEventQueue aeq = cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", "/region"));
-
- //We will be at most 10 ms off
- while(aeq.size() > 0) {
- Thread.sleep(10);
- }
- }
-
- @Override
- public int query(final Query query) throws Exception {
- LuceneQuery<Object, Object> luceneQuery = service.createLuceneQueryFactory().create("index", "/region", new LuceneQueryProvider() {
-
- @Override
- public Query getQuery(LuceneIndex index) throws QueryException {
- return query;
- }
- });
-
- LuceneQueryResults<Object, Object> results = luceneQuery.search();
- return results.size();
- }
- });
- }
-
- @Test
- public void testLuceneWithRegionDirectory() throws Exception {
- doTest("RegionDirectory", new TestCallbacks() {
-
- private IndexWriter writer;
- private SearcherManager searcherManager;
-
- @Override
- public void init() throws Exception {
- RegionDirectory dir = new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>());
- IndexWriterConfig config = new IndexWriterConfig(analyzer);
- writer = new IndexWriter(dir, config);
- searcherManager = new SearcherManager(writer, true, null);
- }
-
- @Override
- public void addObject(String key, String text) throws Exception {
- Document doc = new Document();
- doc.add(new TextField("key", key, Store.YES));
- doc.add(new TextField("text", text, Store.NO));
- writer.addDocument(doc);
- }
-
- @Override
- public void commit() throws Exception {
- writer.commit();
- searcherManager.maybeRefresh();
- }
-
- @Override
- public void cleanup() throws Exception {
- writer.close();
- }
-
- @Override
- public void waitForAsync() throws Exception {
- //do nothing
- }
-
- @Override
- public int query(Query query) throws Exception {
- IndexSearcher searcher = searcherManager.acquire();
- try {
- return searcher.count(query);
- } finally {
- searcherManager.release(searcher);
- }
- }
-
- });
-
- }
-
- @Test
- public void testLucene() throws Exception {
- doTest("Lucene", new TestCallbacks() {
-
- private IndexWriter writer;
- private SearcherManager searcherManager;
-
- @Override
- public void init() throws Exception {
- RAMDirectory dir = new RAMDirectory();
- IndexWriterConfig config = new IndexWriterConfig(analyzer);
- writer = new IndexWriter(dir, config);
- searcherManager = new SearcherManager(writer, true, null);
- }
-
- @Override
- public void addObject(String key, String text) throws Exception {
- Document doc = new Document();
- doc.add(new TextField("key", key, Store.YES));
- doc.add(new TextField("text", text, Store.NO));
- writer.addDocument(doc);
- }
-
- @Override
- public void commit() throws Exception {
- writer.commit();
- searcherManager.maybeRefresh();
- }
-
- @Override
- public void cleanup() throws Exception {
- writer.close();
- }
-
- @Override
- public void waitForAsync() throws Exception {
- //do nothing
- }
-
- @Override
- public int query(Query query) throws Exception {
- IndexSearcher searcher = searcherManager.acquire();
- try {
- return searcher.count(query);
- } finally {
- searcherManager.release(searcher);
- }
- }
-
- });
-
- }
-
- private void doTest(String testName, TestCallbacks callbacks) throws Exception {
-
- //Create some random words. We need to be careful
- //to make sure we get NUM_WORDS distinct words here
- Set<String> wordSet = new HashSet<String>();
- Random rand = new Random();
- while(wordSet.size() < NUM_WORDS) {
- int length = rand.nextInt(12) + 3;
- char[] text = new char[length];
- for(int i = 0; i < length; i++) {
- text[i] = (char) (rand.nextInt(26) + 97);
- }
- wordSet.add(new String(text));
- }
- List<String> words = new ArrayList<String>(wordSet.size());
- words.addAll(wordSet);
-
-
-
- //warm up
- writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 10, COMMIT_INTERVAL[0]);
-
- //Do the actual test
-
- for(int i = 0; i < COMMIT_INTERVAL.length; i++) {
- Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, NUM_QUERIES / 10, COMMIT_INTERVAL[i]);
-
- System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.writeTime));
- System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.queryTime));
- }
- }
-
- private Results writeRandomWords(TestCallbacks callbacks, List<String> words,
- Random rand, int numEntries, int numQueries, int commitInterval) throws Exception {
- Results results = new Results();
- callbacks.init();
- int[] counts = new int[words.size()];
- long start = System.nanoTime();
- try {
- for(int i =0; i < numEntries; i++) {
- int word1 = rand.nextInt(words.size());
- int word2 = rand.nextInt(words.size());
- counts[word1]++;
- counts[word2]++;
- String value = words.get(word1) + " " + words.get(word2);
- callbacks.addObject("key" + i, value);
-
- if(i % commitInterval == 0 && i != 0) {
- callbacks.commit();
- }
- }
- callbacks.commit();
- callbacks.waitForAsync();
- long end = System.nanoTime();
- results.writeTime = end - start;
-
-
- start = System.nanoTime();
- for(int i=0; i < numQueries; i++) {
- int wordIndex = rand.nextInt(words.size());
- String word = words.get(wordIndex);
- Query query = new TermQuery(new Term("text", word));
- int size = callbacks.query(query);
-// int size = callbacks.query(parser.parse(word));
- //All of my tests sometimes seem to be missing a couple of words, including the stock lucene
-// assertEquals("Error on query " + i + " word=" + word, counts[wordIndex], size);
- }
- end = System.nanoTime();
- results.queryTime = end - start;
-
- return results;
- } finally {
- callbacks.cleanup();
- }
- }
-
- private static class TestObject implements DataSerializable {
- private String text;
-
- public TestObject() {
-
- }
-
- public TestObject(String text) {
- super();
- this.text = text;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeString(text, out);
- }
-
- @Override
- public void fromData(DataInput in)
- throws IOException, ClassNotFoundException {
- text = DataSerializer.readString(in);
- }
-
- @Override
- public String toString() {
- return text;
- }
-
-
- }
-
- private interface TestCallbacks {
- public void init() throws Exception;
- public int query(Query query) throws Exception;
- public void addObject(String key, String text) throws Exception;
- public void commit() throws Exception;
- public void waitForAsync() throws Exception;
- public void cleanup() throws Exception;
- }
-
- private static class Results {
- long writeTime;
- long queryTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
new file mode 100644
index 0000000..74f3742
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.LuceneQuery;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
+import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+/**
+ * Microbenchmark of the IndexRepository to compare an
+ * IndexRepository built on top of cache with a
+ * stock lucene IndexWriter with a RAMDirectory.
+ */
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
+public class IndexRepositoryImplPerformanceTest {
+
+ private static final int NUM_WORDS = 1000;
+ private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000};
+ private static int NUM_ENTRIES = 500_000;
+ private static int NUM_QUERIES = 500_000;
+
+ private StandardAnalyzer analyzer = new StandardAnalyzer();
+
+ @Test
+ public void testIndexRepository() throws Exception {
+
+
+ doTest("IndexRepository", new TestCallbacks() {
+
+ private Cache cache;
+ private IndexRepositoryImpl repo;
+ private IndexWriter writer;
+
+ @Override
+ public void addObject(String key, String text) throws Exception {
+ repo.create(key, new TestObject(text));
+ }
+
+ @Override
+ public void commit() throws Exception {
+ repo.commit();
+ }
+
+ @Override
+ public void init() throws Exception {
+ cache = new CacheFactory().set("mcast-port", "0")
+ .set("log-level", "error")
+ .create();
+ Region<String, File> fileRegion = cache.<String, File>createRegionFactory(RegionShortcut.REPLICATE).create("files");
+ Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks");
+
+ RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion);
+
+
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ writer = new IndexWriter(dir, config);
+ String[] indexedFields= new String[] {"text"};
+ HeterogenousLuceneSerializer mapper = new HeterogenousLuceneSerializer(indexedFields);
+ repo = new IndexRepositoryImpl(fileRegion, writer, mapper);
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ writer.close();
+ cache.close();
+ }
+
+ @Override
+ public void waitForAsync() throws Exception {
+ //do nothing
+ }
+
+ @Override
+ public int query(Query query) throws IOException {
+ TopEntriesCollector collector = new TopEntriesCollector();
+ repo.query(query, 100, collector);
+ return collector.size();
+ }
+ });
+ }
+
+ /**
+ * Test our full lucene index implementation
+ * @throws Exception
+ */
+ @Test
+ public void testLuceneIndex() throws Exception {
+
+
+ doTest("LuceneIndex", new TestCallbacks() {
+
+ private Cache cache;
+ private Region<String, TestObject> region;
+ private LuceneService service;
+
+ @Override
+ public void addObject(String key, String text) throws Exception {
+ region.create(key, new TestObject(text));
+ }
+
+ @Override
+ public void commit() throws Exception {
+ //NA
+ }
+
+ @Override
+ public void init() throws Exception {
+ cache = new CacheFactory().set("mcast-port", "0")
+ .set("log-level", "warning")
+ .create();
+ service = LuceneServiceProvider.get(cache);
+ service.createIndex("index", "/region", "text");
+ region = cache.<String, TestObject>createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(new PartitionAttributesFactory<>().setTotalNumBuckets(1).create())
+ .create("region");
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ cache.close();
+ }
+
+ @Override
+ public void waitForAsync() throws Exception {
+ AsyncEventQueue aeq = cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", "/region"));
+
+ //We will be at most 10 ms off
+ while(aeq.size() > 0) {
+ Thread.sleep(10);
+ }
+ }
+
+ @Override
+ public int query(final Query query) throws Exception {
+ LuceneQuery<Object, Object> luceneQuery = service.createLuceneQueryFactory().create("index", "/region", new LuceneQueryProvider() {
+
+ @Override
+ public Query getQuery(LuceneIndex index) throws QueryException {
+ return query;
+ }
+ });
+
+ LuceneQueryResults<Object, Object> results = luceneQuery.search();
+ return results.size();
+ }
+ });
+ }
+
+ @Test
+ public void testLuceneWithRegionDirectory() throws Exception {
+ doTest("RegionDirectory", new TestCallbacks() {
+
+ private IndexWriter writer;
+ private SearcherManager searcherManager;
+
+ @Override
+ public void init() throws Exception {
+ RegionDirectory dir = new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>());
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ writer = new IndexWriter(dir, config);
+ searcherManager = new SearcherManager(writer, true, null);
+ }
+
+ @Override
+ public void addObject(String key, String text) throws Exception {
+ Document doc = new Document();
+ doc.add(new TextField("key", key, Store.YES));
+ doc.add(new TextField("text", text, Store.NO));
+ writer.addDocument(doc);
+ }
+
+ @Override
+ public void commit() throws Exception {
+ writer.commit();
+ searcherManager.maybeRefresh();
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+ writer.close();
+ }
+
+ @Override
+ public void waitForAsync() throws Exception {
+ //do nothing
+ }
+
+ @Override
+ public int query(Query query) throws Exception {
+ IndexSearcher searcher = searcherManager.acquire();
+ try {
+ return searcher.count(query);
+ } finally {
+ searcherManager.release(searcher);
+ }
+ }
+
+ });
+
+ }
+
+ @Test
+ public void testLucene() throws Exception {
+ doTest("Lucene", new TestCallbacks() {
+
+ private IndexWriter writer;
+ private SearcherManager searcherManager;
+
+ @Override
+ public void init() throws Exception {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ writer = new IndexWriter(dir, config);
+ searcherManager = new SearcherManager(writer, true, null);
+ }
+
+ @Override
+ public void addObject(String key, String text) throws Exception {
+ Document doc = new Document();
+ doc.add(new TextField("key", key, Store.YES));
+ doc.add(new TextField("text", text, Store.NO));
+ writer.addDocument(doc);
+ }
+
+ @Override
+ public void commit() throws Exception {
+ writer.commit();
+ searcherManager.maybeRefresh();
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+ writer.close();
+ }
+
+ @Override
+ public void waitForAsync() throws Exception {
+ //do nothing
+ }
+
+ @Override
+ public int query(Query query) throws Exception {
+ IndexSearcher searcher = searcherManager.acquire();
+ try {
+ return searcher.count(query);
+ } finally {
+ searcherManager.release(searcher);
+ }
+ }
+
+ });
+
+ }
+
+ private void doTest(String testName, TestCallbacks callbacks) throws Exception {
+
+ //Create some random words. We need to be careful
+ //to make sure we get NUM_WORDS distinct words here
+ Set<String> wordSet = new HashSet<String>();
+ Random rand = new Random();
+ while(wordSet.size() < NUM_WORDS) {
+ int length = rand.nextInt(12) + 3;
+ char[] text = new char[length];
+ for(int i = 0; i < length; i++) {
+ text[i] = (char) (rand.nextInt(26) + 97);
+ }
+ wordSet.add(new String(text));
+ }
+ List<String> words = new ArrayList<String>(wordSet.size());
+ words.addAll(wordSet);
+
+
+
+ //warm up
+ writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 10, COMMIT_INTERVAL[0]);
+
+ //Do the actual test
+
+ for(int i = 0; i < COMMIT_INTERVAL.length; i++) {
+ Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, NUM_QUERIES / 10, COMMIT_INTERVAL[i]);
+
+ System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.writeTime));
+ System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", commit=" + COMMIT_INTERVAL[i] + "): " + TimeUnit.NANOSECONDS.toMillis(results.queryTime));
+ }
+ }
+
+ private Results writeRandomWords(TestCallbacks callbacks, List<String> words,
+ Random rand, int numEntries, int numQueries, int commitInterval) throws Exception {
+ Results results = new Results();
+ callbacks.init();
+ int[] counts = new int[words.size()];
+ long start = System.nanoTime();
+ try {
+ for(int i =0; i < numEntries; i++) {
+ int word1 = rand.nextInt(words.size());
+ int word2 = rand.nextInt(words.size());
+ counts[word1]++;
+ counts[word2]++;
+ String value = words.get(word1) + " " + words.get(word2);
+ callbacks.addObject("key" + i, value);
+
+ if(i % commitInterval == 0 && i != 0) {
+ callbacks.commit();
+ }
+ }
+ callbacks.commit();
+ callbacks.waitForAsync();
+ long end = System.nanoTime();
+ results.writeTime = end - start;
+
+
+ start = System.nanoTime();
+ for(int i=0; i < numQueries; i++) {
+ int wordIndex = rand.nextInt(words.size());
+ String word = words.get(wordIndex);
+ Query query = new TermQuery(new Term("text", word));
+ int size = callbacks.query(query);
+// int size = callbacks.query(parser.parse(word));
+ //All of my tests sometimes seem to be missing a couple of words, including the stock lucene
+// assertEquals("Error on query " + i + " word=" + word, counts[wordIndex], size);
+ }
+ end = System.nanoTime();
+ results.queryTime = end - start;
+
+ return results;
+ } finally {
+ callbacks.cleanup();
+ }
+ }
+
+ private static class TestObject implements DataSerializable {
+ private String text;
+
+ public TestObject() {
+
+ }
+
+ public TestObject(String text) {
+ super();
+ this.text = text;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeString(text, out);
+ }
+
+ @Override
+ public void fromData(DataInput in)
+ throws IOException, ClassNotFoundException {
+ text = DataSerializer.readString(in);
+ }
+
+ @Override
+ public String toString() {
+ return text;
+ }
+
+
+ }
+
+ private interface TestCallbacks {
+ public void init() throws Exception;
+ public int query(Query query) throws Exception;
+ public void addObject(String key, String text) throws Exception;
+ public void commit() throws Exception;
+ public void waitForAsync() throws Exception;
+ public void cleanup() throws Exception;
+ }
+
+ private static class Results {
+ long writeTime;
+ long queryTime;
+ }
+}
[7/8] incubator-geode git commit: GEODE-714: Modify all tests to use
JUnit Categories
Posted by kl...@apache.org.
GEODE-714: Modify all tests to use JUnit Categories
* Add DistributedTest @Category to DistributedTestCase
* Rename disabled tests and use @Ignore instead.
* Add PerformanceTest @Category to performance tests.
* Disable performance tests that perform no assertions.
* Modify build to check all tests for categories.
* Modify build to use **/*Test.class pattern for all testing tasks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ca6148aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ca6148aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ca6148aa
Branch: refs/heads/feature/GEODE-715
Commit: ca6148aa99faf8fcb278fd7a840f69f622e6fc3f
Parents: bba7656
Author: Kirk Lund <kl...@pivotal.io>
Authored: Mon Dec 28 13:44:38 2015 -0800
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Mon Jan 4 08:43:08 2016 -0800
----------------------------------------------------------------------
build.gradle | 21 +-
.../test/java/com/gemstone/gemfire/BadTest.java | 42 -
.../cache30/Bug40255JUnitDisabledTest.java | 139 ---
.../gemfire/cache30/Bug40255JUnitTest.java | 143 +++
.../cache30/Bug40662JUnitDisabledTest.java | 92 --
.../gemfire/cache30/Bug40662JUnitTest.java | 93 ++
.../locks/CollaborationJUnitDisabledTest.java | 562 ----------
.../internal/locks/CollaborationJUnitTest.java | 617 +++++++++++
.../DiskRegionPerfJUnitPerformanceTest.java | 6 +-
...HARegionQueueStartStopJUnitDisabledTest.java | 123 ---
.../cache/tier/sockets/HAInterestBaseTest.java | 1015 -----------------
.../tier/sockets/HAInterestPart1DUnitTest.java | 102 +-
.../tier/sockets/HAInterestPart2DUnitTest.java | 116 +-
.../cache/tier/sockets/HAInterestTestCase.java | 1018 ++++++++++++++++++
.../tier/sockets/command/CommitCommandTest.java | 6 +
.../logging/LogWriterPerformanceTest.java | 6 +
.../log4j/Log4J2DisabledPerformanceTest.java | 6 +
.../logging/log4j/Log4J2PerformanceTest.java | 9 +-
.../LogWriterLoggerDisabledPerformanceTest.java | 6 +
.../log4j/LogWriterLoggerPerformanceTest.java | 9 +-
.../test/java/dunit/DistributedTestCase.java | 3 +
...IndexRepositoryImplJUnitPerformanceTest.java | 437 --------
.../IndexRepositoryImplPerformanceTest.java | 439 ++++++++
23 files changed, 2473 insertions(+), 2537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f464dc3..4563590 100755
--- a/build.gradle
+++ b/build.gradle
@@ -345,7 +345,7 @@ subprojects {
}
test {
- include '**/*JUnitTest.class'
+ include '**/*Test.class'
useJUnit {
includeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
@@ -364,10 +364,12 @@ subprojects {
//This target does not run any tests. Rather, it validates that there are no
//tests that are missing a category annotation
task checkMissedTests(type: Test) {
- include '**/*JUnitTest.class'
+ include '**/*Test.class'
useJUnit {
excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
}
beforeTest { descriptor ->
@@ -377,7 +379,7 @@ subprojects {
}
task integrationTest(type:Test) {
- include '**/*JUnitTest.class'
+ include '**/*Test.class'
useJUnit {
excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
includeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
@@ -392,17 +394,16 @@ subprojects {
}
task distributedTest(type:Test) {
- include '**/*DUnitTest.class'
+ include '**/*Test.class'
// maxParallelForks = 2
// maxParallelForks = Runtime.runtime.availableProcessors()
-// TODO add @Category(DistributedTest.class) to dunit tests
-// useJUnit {
-// excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
-// excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
-// includeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
-// }
+ useJUnit {
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
+ includeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+ }
//I'm hoping this might deal with SOME OOMEs I've seen
forkEvery 30
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/BadTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/BadTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/BadTest.java
deleted file mode 100644
index 46dc799..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/BadTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire;
-
-import junit.framework.*;
-
-/**
- * This test provides examples of a test failing and a test getting an
- * error. We use it to test JUnit failure reporting.
- */
-public class BadTest extends TestCase {
-
- public BadTest(String name) {
- super(name);
- }
-
- //////// Test Methods
-
- public void testFailure() {
- fail("I'm failing away...");
- }
-
- public void testError() {
- String s = "I've failed";
- throw new RuntimeException(s);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitDisabledTest.java
deleted file mode 100644
index 4fe8a49..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitDisabledTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache30;
-
-import java.io.File;
-import java.util.Properties;
-
-import junit.framework.TestCase;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-
-/**
- * @author Shobhit Agarwal
- *
- */
-public class Bug40255JUnitDisabledTest extends TestCase{
-
- private static final String BUG_40255_XML = Bug40255JUnitDisabledTest.class.getResource("bug40255xmlparameterization.xml").getFile();
- private static final String BUG_40255_PROPS = Bug40255JUnitDisabledTest.class.getResource("bug40255_gemfire.properties").getFile();
-
- private static final String ATTR_PROPERTY_STRING = "region.disk.store";
-
- private static final String ATTR_PROPERTY_VALUE = "teststore";
-
- private static final String NESTED_ATTR_PROPERTY_STRING = "custom-nested.test";
-
- private static final String NESTED_ATTR_PROPERTY_VALUE = "disk";
-
- private static final String ELEMENT_PROPERTY_STRING = "custom-string.element";
-
- private static final String ELEMENT_PROPERTY_VALUE = "example-string";
-
- private static final String CONCAT_ELEMENT_PROPERTY_STRING = "concat.test";
-
- private static final String CONCAT_ELEMENT_PROPERTY_VALUE = "-name";
-
- private static final String ELEMENT_KEY_VALUE = "example-value";
-
- DistributedSystem ds;
- Cache cache;
-
- @Override
- public void setName(String name) {
- super.setName(name);
- }
-
- public void testResolveReplacePropertyStringForLonerCache(){
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", "");
- System.setProperty("gemfirePropertyFile", BUG_40255_PROPS);
- props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40255_XML);
- System.setProperty(NESTED_ATTR_PROPERTY_STRING, NESTED_ATTR_PROPERTY_VALUE);
- System.setProperty(ATTR_PROPERTY_STRING, ATTR_PROPERTY_VALUE);
- System.setProperty(ELEMENT_PROPERTY_STRING, ELEMENT_PROPERTY_VALUE);
- System.setProperty(CONCAT_ELEMENT_PROPERTY_STRING, CONCAT_ELEMENT_PROPERTY_VALUE);
-
- // create the directory where data is going to be stored
- File dir = new File("persistData1");
- dir.mkdir();
-
- this.ds = DistributedSystem.connect(props);
- this.cache = CacheFactory.create(this.ds);
-
- Region exampleRegion = this.cache.getRegion("example-region");
- RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
-
- //Check if disk store got same name as passed in system properties in setup().
- assertEquals(attrs.getDiskStoreName(), System.getProperty(ATTR_PROPERTY_STRING));
- assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE));
- assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE), ELEMENT_KEY_VALUE);
- assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE));
- assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE), CONCAT_ELEMENT_PROPERTY_VALUE);
- }
-
- public void testResolveReplacePropertyStringForNonLonerCache(){
- Properties props = new Properties();
- props.setProperty("mcast-port", "10333");
- props.setProperty("locators", "");
- System.setProperty("gemfirePropertyFile", BUG_40255_PROPS);
- props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40255_XML);
- System.setProperty(NESTED_ATTR_PROPERTY_STRING, NESTED_ATTR_PROPERTY_VALUE);
- System.setProperty(ATTR_PROPERTY_STRING, ATTR_PROPERTY_VALUE);
- System.setProperty(ELEMENT_PROPERTY_STRING, ELEMENT_PROPERTY_VALUE);
- System.setProperty(CONCAT_ELEMENT_PROPERTY_STRING, CONCAT_ELEMENT_PROPERTY_VALUE);
-
- // create the directory where data is going to be stored
- File dir = new File("persistData1");
- dir.mkdir();
-
- this.ds = DistributedSystem.connect(props);
- this.cache = CacheFactory.create(this.ds);
-
- Region exampleRegion = this.cache.getRegion("example-region");
- RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
-
- //Check if disk store got same name as passed in system properties in setup().
- assertEquals(attrs.getDiskStoreName(), System.getProperty(ATTR_PROPERTY_STRING));
- assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE));
- assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE), ELEMENT_KEY_VALUE);
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (this.cache != null) {
- this.cache.close();
- this.cache = null;
- }
- if (this.ds != null) {
- this.ds.disconnect();
- this.ds = null;
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitTest.java
new file mode 100644
index 0000000..92bfbe7
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40255JUnitTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * @author Shobhit Agarwal
+ *
+ */
+@Category(IntegrationTest.class)
+@Ignore("Test is broken and was named Bug40255JUnitDisabledTest")
+public class Bug40255JUnitTest {
+
+ private static final String BUG_40255_XML = Bug40255JUnitTest.class.getResource("bug40255xmlparameterization.xml").getFile();
+ private static final String BUG_40255_PROPS = Bug40255JUnitTest.class.getResource("bug40255_gemfire.properties").getFile();
+
+ private static final String ATTR_PROPERTY_STRING = "region.disk.store";
+
+ private static final String ATTR_PROPERTY_VALUE = "teststore";
+
+ private static final String NESTED_ATTR_PROPERTY_STRING = "custom-nested.test";
+
+ private static final String NESTED_ATTR_PROPERTY_VALUE = "disk";
+
+ private static final String ELEMENT_PROPERTY_STRING = "custom-string.element";
+
+ private static final String ELEMENT_PROPERTY_VALUE = "example-string";
+
+ private static final String CONCAT_ELEMENT_PROPERTY_STRING = "concat.test";
+
+ private static final String CONCAT_ELEMENT_PROPERTY_VALUE = "-name";
+
+ private static final String ELEMENT_KEY_VALUE = "example-value";
+
+ DistributedSystem ds;
+ Cache cache;
+
+ @Test
+ public void testResolveReplacePropertyStringForLonerCache(){
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", "");
+ System.setProperty("gemfirePropertyFile", BUG_40255_PROPS);
+ props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40255_XML);
+ System.setProperty(NESTED_ATTR_PROPERTY_STRING, NESTED_ATTR_PROPERTY_VALUE);
+ System.setProperty(ATTR_PROPERTY_STRING, ATTR_PROPERTY_VALUE);
+ System.setProperty(ELEMENT_PROPERTY_STRING, ELEMENT_PROPERTY_VALUE);
+ System.setProperty(CONCAT_ELEMENT_PROPERTY_STRING, CONCAT_ELEMENT_PROPERTY_VALUE);
+
+ // create the directory where data is going to be stored
+ File dir = new File("persistData1");
+ dir.mkdir();
+
+ this.ds = DistributedSystem.connect(props);
+ this.cache = CacheFactory.create(this.ds);
+
+ Region exampleRegion = this.cache.getRegion("example-region");
+ RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
+
+ //Check if disk store got same name as passed in system properties in setup().
+ assertEquals(attrs.getDiskStoreName(), System.getProperty(ATTR_PROPERTY_STRING));
+ assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE));
+ assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE), ELEMENT_KEY_VALUE);
+ assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE));
+ assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE), CONCAT_ELEMENT_PROPERTY_VALUE);
+ }
+
+ @Test
+ public void testResolveReplacePropertyStringForNonLonerCache(){
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "10333");
+ props.setProperty("locators", "");
+ System.setProperty("gemfirePropertyFile", BUG_40255_PROPS);
+ props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40255_XML);
+ System.setProperty(NESTED_ATTR_PROPERTY_STRING, NESTED_ATTR_PROPERTY_VALUE);
+ System.setProperty(ATTR_PROPERTY_STRING, ATTR_PROPERTY_VALUE);
+ System.setProperty(ELEMENT_PROPERTY_STRING, ELEMENT_PROPERTY_VALUE);
+ System.setProperty(CONCAT_ELEMENT_PROPERTY_STRING, CONCAT_ELEMENT_PROPERTY_VALUE);
+
+ // create the directory where data is going to be stored
+ File dir = new File("persistData1");
+ dir.mkdir();
+
+ this.ds = DistributedSystem.connect(props);
+ this.cache = CacheFactory.create(this.ds);
+
+ Region exampleRegion = this.cache.getRegion("example-region");
+ RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
+
+ //Check if disk store got same name as passed in system properties in setup().
+ assertEquals(attrs.getDiskStoreName(), System.getProperty(ATTR_PROPERTY_STRING));
+ assertNotNull(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE));
+ assertEquals(exampleRegion.get(ELEMENT_PROPERTY_VALUE+CONCAT_ELEMENT_PROPERTY_VALUE), ELEMENT_KEY_VALUE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (this.cache != null) {
+ this.cache.close();
+ this.cache = null;
+ }
+ if (this.ds != null) {
+ this.ds.disconnect();
+ this.ds = null;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitDisabledTest.java
deleted file mode 100644
index 9455641..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitDisabledTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package com.gemstone.gemfire.cache30;
-
-import java.util.Properties;
-
-import junit.framework.TestCase;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-
-/**
- * Test for Bug no. 40662. To verify the default action being set in eviction
- * attributes by CacheXmlParser when cache.xml has eviction attributes with no
- * eviction action specified. which was being set to EvictionAction.NONE
- *
- * @author shoagarwal
- * @since 6.6
- */
-public class Bug40662JUnitDisabledTest extends TestCase {
-
- private static final String BUG_40662_XML = Bug40662JUnitDisabledTest.class.getResource("bug40662noevictionaction.xml").getFile();
-
- DistributedSystem ds;
- Cache cache;
-
- @Override
- public void setName(String name) {
- super.setName(name);
- }
-
- /**
- * Test for checking eviction action in eviction attributes if no evicition
- * action is specified in cache.xml
- */
- public void testEvictionActionSetLocalDestroyPass() {
- Region exampleRegion = this.cache.getRegion("example-region");
- RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
- EvictionAttributes evicAttrs = attrs.getEvictionAttributes();
-
- //Default eviction action is LOCAL_DESTROY always.
- assertEquals(EvictionAction.LOCAL_DESTROY, evicAttrs.getAction());
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (this.cache != null) {
- this.cache.close();
- this.cache = null;
- }
- if (this.ds != null) {
- this.ds.disconnect();
- this.ds = null;
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", "");
- props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40662_XML);
- this.ds = DistributedSystem.connect(props);
- this.cache = CacheFactory.create(this.ds);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitTest.java
new file mode 100644
index 0000000..0d62127
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/Bug40662JUnitTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.cache30;
+
+import static org.junit.Assert.*;
+
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * Test for Bug no. 40662. To verify the default action being set in eviction
+ * attributes by CacheXmlParser when cache.xml has eviction attributes with no
+ * eviction action specified. which was being set to EvictionAction.NONE
+ *
+ * @author shoagarwal
+ * @since 6.6
+ */
+@Category(IntegrationTest.class)
+@Ignore("Test is broken and was named Bug40662JUnitDisabledTest")
+public class Bug40662JUnitTest {
+
+ private static final String BUG_40662_XML = Bug40662JUnitTest.class.getResource("bug40662noevictionaction.xml").getFile();
+
+ DistributedSystem ds;
+ Cache cache;
+
+ /**
+ * Test for checking eviction action in eviction attributes if no evicition
+ * action is specified in cache.xml
+ */
+ public void testEvictionActionSetLocalDestroyPass() {
+ Region exampleRegion = this.cache.getRegion("example-region");
+ RegionAttributes<Object, Object> attrs = exampleRegion.getAttributes();
+ EvictionAttributes evicAttrs = attrs.getEvictionAttributes();
+
+ //Default eviction action is LOCAL_DESTROY always.
+ assertEquals(EvictionAction.LOCAL_DESTROY, evicAttrs.getAction());
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ if (this.cache != null) {
+ this.cache.close();
+ this.cache = null;
+ }
+ if (this.ds != null) {
+ this.ds.disconnect();
+ this.ds = null;
+ }
+ }
+
+ @Before
+ protected void setUp() throws Exception {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", "");
+ props.setProperty(DistributionConfig.CACHE_XML_FILE_NAME, BUG_40662_XML);
+ this.ds = DistributedSystem.connect(props);
+ this.cache = CacheFactory.create(this.ds);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
deleted file mode 100755
index 7b93a36..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.locks;
-
-import junit.framework.*;
-import java.util.*;
-
-import com.gemstone.gemfire.CancelCriterion;
-import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.internal.logging.LocalLogWriter;
-import com.gemstone.gemfire.internal.logging.InternalLogWriter;
-
-import dunit.DistributedTestCase;
-import dunit.DistributedTestCase.WaitCriterion;
-
-/**
- * Tests the Collaboration Lock used internally by dlock service.
- *
- * @author Kirk Lund
- * @since 4.1.1
- */
-public class CollaborationJUnitDisabledTest extends TestCase {
-
- protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL);
- protected Collaboration collaboration;
-
- public CollaborationJUnitDisabledTest(String name) {
- super(name);
- }
-
- public void setUp() throws Exception {
- this.collaboration = new Collaboration(new CancelCriterion() {
-
- public String cancelInProgress() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public RuntimeException generateCancelledException(Throwable e) {
- // TODO Auto-generated method stub
- return null;
- }
-
- });
- }
-
- public void tearDown() throws Exception {
- this.collaboration = null;
- }
-
- protected volatile boolean flagTestBlocksUntilRelease = false;
- protected volatile boolean threadBStartedTestBlocksUntilRelease = false;
- public void testBlocksUntilRelease() throws Exception {
- this.log.info("[testBlocksUntilRelease]");
- Thread threadA = new Thread(group, new Runnable() {
- public void run() {
- collaboration.acquireUninterruptibly("topicA");
- try {
- flagTestBlocksUntilRelease = true;
- while(flagTestBlocksUntilRelease) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignore) {fail("interrupted");}
- }
- }
- finally {
- collaboration.release();
- }
- }
- });
-
- // thread one acquires
- threadA.start();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return CollaborationJUnitDisabledTest.this.flagTestBlocksUntilRelease;
- }
- public String description() {
- return "waiting for thread";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
-
- // thread two blocks until one releeases
- Thread threadB = new Thread(group, new Runnable() {
- public void run() {
- threadBStartedTestBlocksUntilRelease = true;
- collaboration.acquireUninterruptibly("topicB");
- try {
- flagTestBlocksUntilRelease = true;
- WaitCriterion ev2 = new WaitCriterion() {
- public boolean done() {
- return !flagTestBlocksUntilRelease;
- }
- public String description() {
- return "waiting for release";
- }
- };
- DistributedTestCase.waitForCriterion(ev2, 20 * 1000, 200, true);
- }
- finally {
- collaboration.release();
- }
- }
- });
-
- // start up threadB
- threadB.start();
- ev = new WaitCriterion() {
- public boolean done() {
- return threadBStartedTestBlocksUntilRelease;
- }
- public String description() {
- return "waiting for thread b";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
-
- // threadA holds topic and threadB is waiting...
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
-
- // let threadA release so that threadB gets lock
- this.flagTestBlocksUntilRelease = false;
- DistributedTestCase.join(threadA, 30 * 1000, null);
-
- // make sure threadB is doing what it's supposed to do...
- ev = new WaitCriterion() {
- public boolean done() {
- return flagTestBlocksUntilRelease;
- }
- public String description() {
- return "threadB";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
- // threadB must have lock now... let threadB release
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
- this.flagTestBlocksUntilRelease = false;
- DistributedTestCase.join(threadB, 30 * 1000, null);
-
- // collaboration should be free now
- assertFalse(this.collaboration.hasCurrentTopic(threadA));
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic());
- }
-
- protected volatile boolean threadAFlag_TestLateComerJoinsIn = false;
- protected volatile boolean threadBFlag_TestLateComerJoinsIn = false;
- protected volatile boolean threadCFlag_TestLateComerJoinsIn = true;
- protected volatile boolean threadDFlag_TestLateComerJoinsIn = false;
- public void testLateComerJoinsIn() throws Exception {
- this.log.info("[testLateComerJoinsIn]");
-
- final Object topicA = "topicA";
- final Object topicB = "topicB";
-
- // threads one and two acquire
- Thread threadA = new Thread(group, new Runnable() {
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadAFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return !threadAFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- }
- finally {
- collaboration.release();
- }
- }
- });
- threadA.start();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return threadAFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return "wait for ThreadA";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
-
- Thread threadB = new Thread(group, new Runnable() {
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadBFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev2 = new WaitCriterion() {
- public boolean done() {
- return !threadBFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
- }
- finally {
- collaboration.release();
- }
- }
- });
- threadB.start();
- ev = new WaitCriterion() {
- public boolean done() {
- return threadBFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return "";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
-
- // thread three blocks for new topic
- Thread threadC = new Thread(group, new Runnable() {
- public void run() {
- threadCFlag_TestLateComerJoinsIn = false;
- collaboration.acquireUninterruptibly(topicB);
- try {
- threadCFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev2 = new WaitCriterion() {
- public boolean done() {
- return !threadCFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
- }
- finally {
- collaboration.release();
- }
- }
- });
- threadC.start();
- ev = new WaitCriterion() {
- public boolean done() {
- return threadCFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // thread four (lateComer) acquires current topic immediately
- Thread threadD = new Thread(group, new Runnable() {
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadDFlag_TestLateComerJoinsIn = true;
- while(threadDFlag_TestLateComerJoinsIn) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignore) {fail("interrupted");}
- }
- }
- finally {
- collaboration.release();
- }
- }
- });
- threadD.start();
- ev = new WaitCriterion() {
- public boolean done() {
- return threadDFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
-
- // release threadA
- this.threadAFlag_TestLateComerJoinsIn = false;
- DistributedTestCase.join(threadA, 30 * 1000, null);
- assertFalse(this.collaboration.hasCurrentTopic(threadA));
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // release threadB
- this.threadBFlag_TestLateComerJoinsIn = false;
- DistributedTestCase.join(threadB, 30 * 1000, null);
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // release threadD
- this.threadDFlag_TestLateComerJoinsIn = false;
- DistributedTestCase.join(threadD, 30 * 1000, null);
- ev = new WaitCriterion() {
- public boolean done() {
- return threadCFlag_TestLateComerJoinsIn;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.hasCurrentTopic(threadD));
- assertFalse(this.collaboration.isCurrentTopic(topicA));
- assertTrue(this.collaboration.isCurrentTopic(topicB));
-
- // release threadC
- this.threadCFlag_TestLateComerJoinsIn = false;
- DistributedTestCase.join(threadC, 30 * 1000, null);
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
- }
-
- protected List waitingList = Collections.synchronizedList(new ArrayList());
- protected List fairnessList = Collections.synchronizedList(new ArrayList());
- protected volatile boolean runTestFairnessStressfully = true;
- public void testFairnessStressfully() throws Exception {
- this.log.info("[testFairnessStressfully]");
- final int numThreads = 20;
- Thread threads[] = new Thread[numThreads];
-
- Runnable run = new Runnable() {
- public void run() {
- boolean released = false;
- try {
- String uniqueTopic = Thread.currentThread().getName();
- while(runTestFairnessStressfully) {
- waitingList.add(uniqueTopic);
- collaboration.acquireUninterruptibly(uniqueTopic);
- try {
- released = false;
- fairnessList.add(uniqueTopic);
- waitingList.remove(uniqueTopic);
- }
- finally {
- // wait for the other threads to line up...
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return !runTestFairnessStressfully || waitingList.size() >= numThreads - 1;
- }
- public String description() {
- return "other threads lining up";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- collaboration.release();
- released = true;
- }
- }
- }
- finally {
- if (!released) {
- collaboration.release();
- }
- }
- }
- };
-
- try {
- // many threads loop: acquire and release with unique topic
- for (int t = 0; t < threads.length; t++) {
- threads[t] = new Thread(group, run, String.valueOf(t));
- threads[t].start();
- }
-
- log.info("Started all threads... waiting for test to complete.");
-
- // wait for numThreads * 10
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return fairnessList.size() >= numThreads * 20;
- }
- public String description() {
- return "waiting for numThreads * 10";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
- }
- finally {
- if (this.runTestFairnessStressfully) {
- this.runTestFairnessStressfully = false;
- }
- }
-
- for (int t = 0; t < threads.length; t++) {
- DistributedTestCase.join(threads[t], 30 * 1000, null);
- }
-
- // assert that all topics are acquired in order
- // count number of occurrences of each thread
- int count[] = new int[numThreads];
- for (int i = 0; i < count.length; i++) { // shouldn't be necessary
- count[i] = 0;
- }
- synchronized(this.fairnessList) {
- for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) {
- int id = Integer.valueOf((String)iter.next()).intValue();
- count[id] = count[id]+1;
- }
- }
-
- int totalLocks = 0;
- int minLocks = Integer.MAX_VALUE;
- int maxLocks = 0;
- for (int i = 0; i < count.length; i++) {
- int locks = count[i];
- this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " +
- locks + " times.");
- if (locks < minLocks) minLocks = locks;
- if (locks > maxLocks) maxLocks = locks;
- totalLocks = totalLocks + locks;
- }
-
- this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks +
- " minLocks=" + minLocks +
- " maxLocks=" + maxLocks);
-
- int expectedLocks = (totalLocks / numThreads) + 1;
-
- // NOTE: if you turn on fine logs, this deviation may be too small...
- // slower machines may also fail depending on thread scheduling
- int deviation = (int)(expectedLocks * 0.25);
- int lowThreshold = expectedLocks - deviation;
- int highThreshold = expectedLocks + deviation;
-
- this.log.info("[testFairnessStressfully] deviation=" + deviation +
- " expectedLocks=" + expectedLocks +
- " lowThreshold=" + lowThreshold +
- " highThreshold=" + highThreshold);
-
- // if these assertions keep failing we'll have to rewrite the test
- // to handle scheduling of the threads...
-
- assertTrue("minLocks is less than lowThreshold",
- minLocks >= lowThreshold);
- assertTrue("maxLocks is greater than highThreshold",
- maxLocks <= highThreshold);
- }
-
- public void testHasCurrentTopic() throws Exception {
- this.log.info("[testHasCurrentTopic]");
- assertTrue(!this.collaboration.hasCurrentTopic());
- this.collaboration.acquireUninterruptibly("testHasCurrentTopic");
- try {
- assertTrue(this.collaboration.hasCurrentTopic());
- }
- finally {
- this.collaboration.release();
- }
- assertTrue(!this.collaboration.hasCurrentTopic());
- }
-
- protected volatile boolean flagTestThreadHasCurrentTopic = false;
- public void testThreadHasCurrentTopic() throws Exception {
- this.log.info("[testThreadHasCurrentTopic]");
- Thread thread = new Thread(group, new Runnable() {
- public void run() {
- collaboration.acquireUninterruptibly("testThreadHasCurrentTopic");
- try {
- flagTestThreadHasCurrentTopic = true;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return !flagTestThreadHasCurrentTopic;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
- }
- finally {
- collaboration.release();
- }
- }
- });
-
- // before starting thread, hasCurrentTopic(thread) returns false
- assertTrue(!this.collaboration.hasCurrentTopic(thread));
- thread.start();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return flagTestThreadHasCurrentTopic;
- }
- public String description() {
- return null;
- }
- };
- DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-
- // after starting thread, hasCurrentTopic(thread) returns true
- assertTrue(this.collaboration.hasCurrentTopic(thread));
- this.flagTestThreadHasCurrentTopic = false;
- DistributedTestCase.join(thread, 30 * 1000, null);
-
- // after thread finishes, hasCurrentTopic(thread) returns false
- assertTrue(!this.collaboration.hasCurrentTopic(thread));
- }
-
- public void testIsCurrentTopic() throws Exception {
- this.log.info("[testIsCurrentTopic]");
- Object topic = "testIsCurrentTopic";
- assertTrue(!this.collaboration.isCurrentTopic(topic));
- this.collaboration.acquireUninterruptibly(topic);
- try {
- assertTrue(this.collaboration.isCurrentTopic(topic));
- }
- finally {
- this.collaboration.release();
- }
- assertTrue(!this.collaboration.isCurrentTopic(topic));
- }
-
- protected final ThreadGroup group =
- new ThreadGroup("CollaborationJUnitTest Threads") {
- public void uncaughtException(Thread t, Throwable e)
- {
- if (e instanceof VirtualMachineError) {
- SystemFailure.setFailure((VirtualMachineError)e); // don't throw
- }
- String s = "Uncaught exception in thread " + t;
- log.error(s, e);
- fail(s);
- }
- };
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitTest.java
new file mode 100755
index 0000000..711500e
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.locks;
+
+import static org.junit.Assert.*;
+
+import java.util.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.WaitCriterion;
+
+/**
+ * Tests the Collaboration Lock used internally by dlock service.
+ *
+ * @author Kirk Lund
+ * @since 4.1.1
+ */
+@Category(IntegrationTest.class)
+@Ignore("Test is broken and was named CollaborationJUnitDisabledTest")
+public class CollaborationJUnitTest {
+
+ protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL);
+ protected Collaboration collaboration;
+
+ @Before
+ public void setUp() throws Exception {
+ this.collaboration = new Collaboration(new CancelCriterion() {
+ @Override
+ public String cancelInProgress() {
+ return null;
+ }
+ @Override
+ public RuntimeException generateCancelledException(Throwable e) {
+ return null;
+ }
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.collaboration = null;
+ }
+
+ protected volatile boolean flagTestBlocksUntilRelease = false;
+ protected volatile boolean threadBStartedTestBlocksUntilRelease = false;
+
+ @Test
+ public void testBlocksUntilRelease() throws Exception {
+ this.log.info("[testBlocksUntilRelease]");
+ Thread threadA = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ collaboration.acquireUninterruptibly("topicA");
+ try {
+ flagTestBlocksUntilRelease = true;
+ while(flagTestBlocksUntilRelease) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+
+ // thread one acquires
+ threadA.start();
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return CollaborationJUnitTest.this.flagTestBlocksUntilRelease;
+ }
+ @Override
+ public String description() {
+ return "waiting for thread";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
+ assertTrue(this.collaboration.hasCurrentTopic(threadA));
+
+ // thread two blocks until one releeases
+ Thread threadB = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ threadBStartedTestBlocksUntilRelease = true;
+ collaboration.acquireUninterruptibly("topicB");
+ try {
+ flagTestBlocksUntilRelease = true;
+ WaitCriterion ev2 = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !flagTestBlocksUntilRelease;
+ }
+ @Override
+ public String description() {
+ return "waiting for release";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev2, 20 * 1000, 200, true);
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+
+ // start up threadB
+ threadB.start();
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadBStartedTestBlocksUntilRelease;
+ }
+ @Override
+ public String description() {
+ return "waiting for thread b";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
+
+ // threadA holds topic and threadB is waiting...
+ assertTrue(this.collaboration.hasCurrentTopic(threadA));
+ assertFalse(this.collaboration.hasCurrentTopic(threadB));
+
+ // let threadA release so that threadB gets lock
+ this.flagTestBlocksUntilRelease = false;
+ DistributedTestCase.join(threadA, 30 * 1000, null);
+
+ // make sure threadB is doing what it's supposed to do...
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return flagTestBlocksUntilRelease;
+ }
+ @Override
+ public String description() {
+ return "threadB";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
+ // threadB must have lock now... let threadB release
+ assertTrue(this.collaboration.hasCurrentTopic(threadB));
+ this.flagTestBlocksUntilRelease = false;
+ DistributedTestCase.join(threadB, 30 * 1000, null);
+
+ // collaboration should be free now
+ assertFalse(this.collaboration.hasCurrentTopic(threadA));
+ assertFalse(this.collaboration.hasCurrentTopic(threadB));
+ assertFalse(this.collaboration.hasCurrentTopic());
+ }
+
+ protected volatile boolean threadAFlag_TestLateComerJoinsIn = false;
+ protected volatile boolean threadBFlag_TestLateComerJoinsIn = false;
+ protected volatile boolean threadCFlag_TestLateComerJoinsIn = true;
+ protected volatile boolean threadDFlag_TestLateComerJoinsIn = false;
+
+ @Test
+ public void testLateComerJoinsIn() throws Exception {
+ this.log.info("[testLateComerJoinsIn]");
+
+ final Object topicA = "topicA";
+ final Object topicB = "topicB";
+
+ // threads one and two acquire
+ Thread threadA = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ collaboration.acquireUninterruptibly(topicA);
+ try {
+ threadAFlag_TestLateComerJoinsIn = true;
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !threadAFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+ threadA.start();
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadAFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return "wait for ThreadA";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+ assertTrue(this.collaboration.hasCurrentTopic(threadA));
+ assertTrue(this.collaboration.isCurrentTopic(topicA));
+
+ Thread threadB = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ collaboration.acquireUninterruptibly(topicA);
+ try {
+ threadBFlag_TestLateComerJoinsIn = true;
+ WaitCriterion ev2 = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !threadBFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+ threadB.start();
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadBFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return "";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ assertTrue(this.collaboration.hasCurrentTopic(threadB));
+
+ // thread three blocks for new topic
+ Thread threadC = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ threadCFlag_TestLateComerJoinsIn = false;
+ collaboration.acquireUninterruptibly(topicB);
+ try {
+ threadCFlag_TestLateComerJoinsIn = true;
+ WaitCriterion ev2 = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !threadCFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+ threadC.start();
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadCFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ assertFalse(this.collaboration.hasCurrentTopic(threadC));
+ assertFalse(this.collaboration.isCurrentTopic(topicB));
+
+ // thread four (lateComer) acquires current topic immediately
+ Thread threadD = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ collaboration.acquireUninterruptibly(topicA);
+ try {
+ threadDFlag_TestLateComerJoinsIn = true;
+ while(threadDFlag_TestLateComerJoinsIn) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+ threadD.start();
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadDFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ assertTrue(this.collaboration.hasCurrentTopic(threadD));
+
+ // release threadA
+ this.threadAFlag_TestLateComerJoinsIn = false;
+ DistributedTestCase.join(threadA, 30 * 1000, null);
+ assertFalse(this.collaboration.hasCurrentTopic(threadA));
+ assertTrue(this.collaboration.hasCurrentTopic(threadB));
+ assertFalse(this.collaboration.hasCurrentTopic(threadC));
+ assertTrue(this.collaboration.hasCurrentTopic(threadD));
+ assertTrue(this.collaboration.isCurrentTopic(topicA));
+ assertFalse(this.collaboration.isCurrentTopic(topicB));
+
+ // release threadB
+ this.threadBFlag_TestLateComerJoinsIn = false;
+ DistributedTestCase.join(threadB, 30 * 1000, null);
+ assertFalse(this.collaboration.hasCurrentTopic(threadB));
+ assertFalse(this.collaboration.hasCurrentTopic(threadC));
+ assertTrue(this.collaboration.hasCurrentTopic(threadD));
+ assertTrue(this.collaboration.isCurrentTopic(topicA));
+ assertFalse(this.collaboration.isCurrentTopic(topicB));
+
+ // release threadD
+ this.threadDFlag_TestLateComerJoinsIn = false;
+ DistributedTestCase.join(threadD, 30 * 1000, null);
+ ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return threadCFlag_TestLateComerJoinsIn;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ assertTrue(this.collaboration.hasCurrentTopic(threadC));
+ assertFalse(this.collaboration.hasCurrentTopic(threadD));
+ assertFalse(this.collaboration.isCurrentTopic(topicA));
+ assertTrue(this.collaboration.isCurrentTopic(topicB));
+
+ // release threadC
+ this.threadCFlag_TestLateComerJoinsIn = false;
+ DistributedTestCase.join(threadC, 30 * 1000, null);
+ assertFalse(this.collaboration.hasCurrentTopic(threadC));
+ assertFalse(this.collaboration.isCurrentTopic(topicA));
+ assertFalse(this.collaboration.isCurrentTopic(topicB));
+ }
+
+ protected List waitingList = Collections.synchronizedList(new ArrayList());
+ protected List fairnessList = Collections.synchronizedList(new ArrayList());
+ protected volatile boolean runTestFairnessStressfully = true;
+
+ @Test
+ public void testFairnessStressfully() throws Exception {
+ this.log.info("[testFairnessStressfully]");
+ final int numThreads = 20;
+ Thread threads[] = new Thread[numThreads];
+
+ Runnable run = new Runnable() {
+ public void run() {
+ boolean released = false;
+ try {
+ String uniqueTopic = Thread.currentThread().getName();
+ while(runTestFairnessStressfully) {
+ waitingList.add(uniqueTopic);
+ collaboration.acquireUninterruptibly(uniqueTopic);
+ try {
+ released = false;
+ fairnessList.add(uniqueTopic);
+ waitingList.remove(uniqueTopic);
+ }
+ finally {
+ // wait for the other threads to line up...
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !runTestFairnessStressfully || waitingList.size() >= numThreads - 1;
+ }
+ @Override
+ public String description() {
+ return "other threads lining up";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ collaboration.release();
+ released = true;
+ }
+ }
+ }
+ finally {
+ if (!released) {
+ collaboration.release();
+ }
+ }
+ }
+ };
+
+ try {
+ // many threads loop: acquire and release with unique topic
+ for (int t = 0; t < threads.length; t++) {
+ threads[t] = new Thread(group, run, String.valueOf(t));
+ threads[t].start();
+ }
+
+ log.info("Started all threads... waiting for test to complete.");
+
+ // wait for numThreads * 10
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return fairnessList.size() >= numThreads * 20;
+ }
+ @Override
+ public String description() {
+ return "waiting for numThreads * 10";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
+ }
+ finally {
+ if (this.runTestFairnessStressfully) {
+ this.runTestFairnessStressfully = false;
+ }
+ }
+
+ for (int t = 0; t < threads.length; t++) {
+ DistributedTestCase.join(threads[t], 30 * 1000, null);
+ }
+
+ // assert that all topics are acquired in order
+ // count number of occurrences of each thread
+ int count[] = new int[numThreads];
+ for (int i = 0; i < count.length; i++) { // shouldn't be necessary
+ count[i] = 0;
+ }
+ synchronized(this.fairnessList) {
+ for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) {
+ int id = Integer.valueOf((String)iter.next()).intValue();
+ count[id] = count[id]+1;
+ }
+ }
+
+ int totalLocks = 0;
+ int minLocks = Integer.MAX_VALUE;
+ int maxLocks = 0;
+ for (int i = 0; i < count.length; i++) {
+ int locks = count[i];
+ this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " +
+ locks + " times.");
+ if (locks < minLocks) minLocks = locks;
+ if (locks > maxLocks) maxLocks = locks;
+ totalLocks = totalLocks + locks;
+ }
+
+ this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks +
+ " minLocks=" + minLocks +
+ " maxLocks=" + maxLocks);
+
+ int expectedLocks = (totalLocks / numThreads) + 1;
+
+ // NOTE: if you turn on fine logs, this deviation may be too small...
+ // slower machines may also fail depending on thread scheduling
+ int deviation = (int)(expectedLocks * 0.25);
+ int lowThreshold = expectedLocks - deviation;
+ int highThreshold = expectedLocks + deviation;
+
+ this.log.info("[testFairnessStressfully] deviation=" + deviation +
+ " expectedLocks=" + expectedLocks +
+ " lowThreshold=" + lowThreshold +
+ " highThreshold=" + highThreshold);
+
+ // if these assertions keep failing we'll have to rewrite the test
+ // to handle scheduling of the threads...
+
+ assertTrue("minLocks is less than lowThreshold",
+ minLocks >= lowThreshold);
+ assertTrue("maxLocks is greater than highThreshold",
+ maxLocks <= highThreshold);
+ }
+
+ @Test
+ public void testHasCurrentTopic() throws Exception {
+ this.log.info("[testHasCurrentTopic]");
+ assertTrue(!this.collaboration.hasCurrentTopic());
+ this.collaboration.acquireUninterruptibly("testHasCurrentTopic");
+ try {
+ assertTrue(this.collaboration.hasCurrentTopic());
+ }
+ finally {
+ this.collaboration.release();
+ }
+ assertTrue(!this.collaboration.hasCurrentTopic());
+ }
+
+ protected volatile boolean flagTestThreadHasCurrentTopic = false;
+
+ @Test
+ public void testThreadHasCurrentTopic() throws Exception {
+ this.log.info("[testThreadHasCurrentTopic]");
+ Thread thread = new Thread(group, new Runnable() {
+ @Override
+ public void run() {
+ collaboration.acquireUninterruptibly("testThreadHasCurrentTopic");
+ try {
+ flagTestThreadHasCurrentTopic = true;
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return !flagTestThreadHasCurrentTopic;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+ }
+ finally {
+ collaboration.release();
+ }
+ }
+ });
+
+ // before starting thread, hasCurrentTopic(thread) returns false
+ assertTrue(!this.collaboration.hasCurrentTopic(thread));
+ thread.start();
+ WaitCriterion ev = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return flagTestThreadHasCurrentTopic;
+ }
+ @Override
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
+
+ // after starting thread, hasCurrentTopic(thread) returns true
+ assertTrue(this.collaboration.hasCurrentTopic(thread));
+ this.flagTestThreadHasCurrentTopic = false;
+ DistributedTestCase.join(thread, 30 * 1000, null);
+
+ // after thread finishes, hasCurrentTopic(thread) returns false
+ assertTrue(!this.collaboration.hasCurrentTopic(thread));
+ }
+
+ @Test
+ public void testIsCurrentTopic() throws Exception {
+ this.log.info("[testIsCurrentTopic]");
+ Object topic = "testIsCurrentTopic";
+ assertTrue(!this.collaboration.isCurrentTopic(topic));
+ this.collaboration.acquireUninterruptibly(topic);
+ try {
+ assertTrue(this.collaboration.isCurrentTopic(topic));
+ }
+ finally {
+ this.collaboration.release();
+ }
+ assertTrue(!this.collaboration.isCurrentTopic(topic));
+ }
+
+ protected final ThreadGroup group =
+ new ThreadGroup("CollaborationJUnitTest Threads") {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ if (e instanceof VirtualMachineError) {
+ SystemFailure.setFailure((VirtualMachineError)e); // don't throw
+ }
+ String s = "Uncaught exception in thread " + t;
+ log.error(s, e);
+ fail(s);
+ }
+ };
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/diskPerf/DiskRegionPerfJUnitPerformanceTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/diskPerf/DiskRegionPerfJUnitPerformanceTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/diskPerf/DiskRegionPerfJUnitPerformanceTest.java
index 0ee9d4f..5ec4af8 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/diskPerf/DiskRegionPerfJUnitPerformanceTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/diskPerf/DiskRegionPerfJUnitPerformanceTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -31,14 +32,15 @@ import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.internal.cache.DiskRegionHelperFactory;
import com.gemstone.gemfire.internal.cache.DiskRegionProperties;
import com.gemstone.gemfire.internal.cache.DiskRegionTestingBase;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
/**
* Consolidated Disk Region Perftest. Overflow, Persist, OverflowWithPersist
* modes are tested for Sync, AsyncWithBuffer and AsyncWithoutBufer writes.
*
*/
-@Category(IntegrationTest.class)
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
public class DiskRegionPerfJUnitPerformanceTest extends DiskRegionTestingBase
{
LogWriter log = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueueStartStopJUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueueStartStopJUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueueStartStopJUnitDisabledTest.java
deleted file mode 100755
index 4cfc9ba..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueueStartStopJUnitDisabledTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package com.gemstone.gemfire.internal.cache.ha;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.internal.Assert;
-
-import junit.framework.TestCase;
-
-/**
- * @author Mitul Bid
- *
- */
-public class HARegionQueueStartStopJUnitDisabledTest extends TestCase
-{
-
- /**
- * Creates the cache instance for the test
- *
- * @return the cache instance
- * @throws CacheException -
- * thrown if any exception occurs in cache creation
- */
- private Cache createCache() throws CacheException
- {
- return CacheFactory.create(DistributedSystem.connect(new Properties()));
- }
-
- /**
- * Creates HA region-queue object
- *
- * @return HA region-queue object
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
- */
- private RegionQueue createHARegionQueue(String name, Cache cache)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException
- {
- RegionQueue regionqueue =HARegionQueue.getHARegionQueueInstance(name, cache,HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
- return regionqueue;
- }
-
- public void testStartStop()
- {
- try {
- boolean exceptionOccured = false;
- Cache cache = createCache();
- createHARegionQueue("test", cache);
- Assert
- .assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting() != null);
- HARegionQueue.stopHAServices();
- try {
- HARegionQueue.getDispatchedMessagesMapForTesting();
- }
- catch (NullPointerException e) {
- exceptionOccured = true;
- }
- if (!exceptionOccured) {
- fail("Expected exception to occur but did not occur");
- }
- HARegionQueue.startHAServices((GemFireCacheImpl)cache);
- Assert
- .assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting() != null);
- cache.close();
- try {
- HARegionQueue.getDispatchedMessagesMapForTesting();
- }
- catch (NullPointerException e) {
- exceptionOccured = true;
- }
- if (!exceptionOccured) {
- fail("Expected exception to occur but did not occur");
- }
-
- cache = createCache();
-
- try {
- HARegionQueue.getDispatchedMessagesMapForTesting();
- }
- catch (NullPointerException e) {
- exceptionOccured = true;
- }
- if (!exceptionOccured) {
- fail("Expected exception to occur but did not occur");
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Test failed due to " + e);
- }
-
- }
-
-
-}
[6/8] incubator-geode git commit: GEODE-714: Modify all tests to use
JUnit Categories
Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestBaseTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestBaseTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestBaseTest.java
deleted file mode 100755
index 90679d9..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestBaseTest.java
+++ /dev/null
@@ -1,1015 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.tier.sockets;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.InterestResultPolicy;
-import com.gemstone.gemfire.cache.MirrorType;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.PoolImpl;
-import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
-import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.ServerLocation;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
-import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.tier.InterestType;
-
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.VM;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Tests Interest Registration Functionality
- */
-@SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"})
-public class HAInterestBaseTest extends DistributedTestCase {
-
- protected static final int TIMEOUT_MILLIS = 60 * 1000;
- protected static final int INTERVAL_MILLIS = 10;
-
- protected static final String REGION_NAME = "HAInterestBaseTest_region";
-
- protected static final String k1 = "k1";
- protected static final String k2 = "k2";
- protected static final String client_k1 = "client-k1";
- protected static final String client_k2 = "client-k2";
- protected static final String server_k1 = "server-k1";
- protected static final String server_k2 = "server-k2";
- protected static final String server_k1_updated = "server_k1_updated";
-
- protected static Cache cache = null;
- protected static PoolImpl pool = null;
- protected static Connection conn = null;
-
- protected static int PORT1;
- protected static int PORT2;
- protected static int PORT3;
-
- protected static boolean isBeforeRegistrationCallbackCalled = false;
- protected static boolean isBeforeInterestRecoveryCallbackCalled = false;
- protected static boolean isAfterRegistrationCallbackCalled = false;
-
- protected static Host host = null;
- protected static VM server1 = null;
- protected static VM server2 = null;
- protected static VM server3 = null;
-
- protected volatile static boolean exceptionOccured = false;
-
- public HAInterestBaseTest(String name) {
- super(name);
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- host = Host.getHost(0);
- server1 = host.getVM(0);
- server2 = host.getVM(1);
- server3 = host.getVM(2);
- CacheServerTestUtil.disableShufflingOfEndpoints();
- // start servers first
- PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- PORT2 = ((Integer) server2.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- PORT3 = ((Integer) server3.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- exceptionOccured = false;
- addExpectedException("java.net.ConnectException: Connection refused: connect");
- }
-
- @Override
- public void tearDown2() throws Exception {
- // close the clients first
- closeCache();
-
- // then close the servers
- server1.invoke(HAInterestBaseTest.class, "closeCache");
- server2.invoke(HAInterestBaseTest.class, "closeCache");
- server3.invoke(HAInterestBaseTest.class, "closeCache");
- CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
- }
-
- public static void closeCache() {
- PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
- PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
- PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
- PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
- HAInterestBaseTest.isAfterRegistrationCallbackCalled = false;
- HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = false;
- HAInterestBaseTest.isBeforeRegistrationCallbackCalled = false;
- if (cache != null && !cache.isClosed()) {
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- cache = null;
- pool = null;
- conn = null;
- }
-
- /**
- * Return the current primary waiting for a primary to exist.
- *
- * @since 5.7
- */
- public static VM getPrimaryVM() {
- return getPrimaryVM(null);
- }
-
- /**
- * Return the current primary waiting for a primary to exist and for it not to
- * be the oldPrimary (if oldPrimary is NOT null).
- *
- * @since 5.7
- */
- public static VM getPrimaryVM(final VM oldPrimary) {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- int primaryPort = pool.getPrimaryPort();
- if (primaryPort == -1) {
- return false;
- }
- // we have a primary
- VM currentPrimary = getServerVM(primaryPort);
- if (currentPrimary != oldPrimary) {
- return true;
- }
- return false;
- }
- @Override
- public String description() {
- return "waiting for primary";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- int primaryPort = pool.getPrimaryPort();
- assertTrue(primaryPort != -1);
- VM currentPrimary = getServerVM(primaryPort);
- assertTrue(currentPrimary != oldPrimary);
- return currentPrimary;
- }
-
- public static VM getBackupVM() {
- return getBackupVM(null);
- }
-
- public static VM getBackupVM(VM stoppedBackup) {
- VM currentPrimary = getPrimaryVM(null);
- if (currentPrimary != server2 && server2 != stoppedBackup) {
- return server2;
- } else if (currentPrimary != server3 && server3 != stoppedBackup) {
- return server3;
- } else if (currentPrimary != server1 && server1 != stoppedBackup) {
- return server1;
- } else {
- fail("expected currentPrimary " + currentPrimary + " to be " + server1 + ", or " + server2 + ", or " + server3);
- return null;
- }
- }
-
- /**
- * Given a server vm (server1, server2, or server3) return its port.
- *
- * @since 5.7
- */
- public static int getServerPort(VM vm) {
- if (vm == server1) {
- return PORT1;
- } else if (vm == server2) {
- return PORT2;
- } else if (vm == server3) {
- return PORT3;
- } else {
- fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", or " + server3);
- return -1;
- }
- }
-
- /**
- * Given a server port (PORT1, PORT2, or PORT3) return its vm.
- *
- * @since 5.7
- */
- public static VM getServerVM(int port) {
- if (port == PORT1) {
- return server1;
- } else if (port == PORT2) {
- return server2;
- } else if (port == PORT3) {
- return server3;
- } else {
- fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", or " + PORT3);
- return null;
- }
- }
-
- public static void verifyRefreshedEntriesFromServer() {
- final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- Region.Entry re = r1.getEntry(k1);
- if (re == null)
- return false;
- Object val = re.getValue();
- return client_k1.equals(val);
- }
- @Override
- public String description() {
- return "waiting for client_k1 refresh from server";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- Region.Entry re = r1.getEntry(k2);
- if (re == null)
- return false;
- Object val = re.getValue();
- return client_k2.equals(val);
- }
- @Override
- public String description() {
- return "waiting for client_k2 refresh from server";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
- }
-
- public static void verifyDeadAndLiveServers(final int expectedDeadServers, final int expectedLiveServers) {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == expectedLiveServers;
- }
- @Override
- public String description() {
- return "waiting for pool.getConnectedServerCount() == expectedLiveServer";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
- }
-
- public static void putK1andK2() {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
- r1.put(k1, server_k1);
- r1.put(k2, server_k2);
- }
-
- public static void setClientServerObserverForBeforeInterestRecoveryFailure() {
- PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
- ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
- public void beforeInterestRecovery() {
- synchronized (HAInterestBaseTest.class) {
- Thread t = new Thread() {
- public void run() {
- getBackupVM().invoke(HAInterestBaseTest.class, "startServer");
- getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
- }
- };
- t.start();
- try {
- DistributedTestCase.join(t, 30 * 1000, getLogWriter());
- } catch (Exception ignore) {
- exceptionOccured = true;
- }
- HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = true;
- HAInterestBaseTest.class.notify();
- PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
- }
- }
- });
- }
-
- public static void setClientServerObserverForBeforeInterestRecovery() {
- PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
- ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
- public void beforeInterestRecovery() {
- synchronized (HAInterestBaseTest.class) {
- Thread t = new Thread() {
- public void run() {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
- r1.put(k1, server_k1_updated);
- }
- };
- t.start();
-
- HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = true;
- HAInterestBaseTest.class.notify();
- PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
- }
- }
- });
- }
-
- public static void waitForBeforeInterestRecoveryCallBack() throws InterruptedException {
- assertNotNull(cache);
- synchronized (HAInterestBaseTest.class) {
- while (!isBeforeInterestRecoveryCallbackCalled) {
- HAInterestBaseTest.class.wait();
- }
- }
- }
-
- public static void setClientServerObserverForBeforeRegistration(final VM vm) {
- PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true;
- ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
- public void beforeInterestRegistration() {
- synchronized (HAInterestBaseTest.class) {
- vm.invoke(HAInterestBaseTest.class, "startServer");
- HAInterestBaseTest.isBeforeRegistrationCallbackCalled = true;
- HAInterestBaseTest.class.notify();
- PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
- }
- }
- });
- }
-
- public static void waitForBeforeRegistrationCallback() throws InterruptedException {
- assertNotNull(cache);
- synchronized (HAInterestBaseTest.class) {
- while (!isBeforeRegistrationCallbackCalled) {
- HAInterestBaseTest.class.wait();
- }
- }
- }
-
- public static void setClientServerObserverForAfterRegistration(final VM vm) {
- PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true;
- ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
- public void afterInterestRegistration() {
- synchronized (HAInterestBaseTest.class) {
- vm.invoke(HAInterestBaseTest.class, "startServer");
- HAInterestBaseTest.isAfterRegistrationCallbackCalled = true;
- HAInterestBaseTest.class.notify();
- PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
- }
- }
- });
- }
-
- public static void waitForAfterRegistrationCallback() throws InterruptedException {
- assertNotNull(cache);
- if (!isAfterRegistrationCallbackCalled) {
- synchronized (HAInterestBaseTest.class) {
- while (!isAfterRegistrationCallbackCalled) {
- HAInterestBaseTest.class.wait();
- }
- }
- }
- }
-
- public static void unSetClientServerObserverForRegistrationCallback() {
- synchronized (HAInterestBaseTest.class) {
- PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
- PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
- HAInterestBaseTest.isBeforeRegistrationCallbackCalled = false;
- HAInterestBaseTest.isAfterRegistrationCallbackCalled = false;
- }
- }
-
- public static void verifyDispatcherIsAlive() {
- assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return cache.getCacheServers().size() == 1;
- }
- @Override
- public String description() {
- return "waiting for cache.getCacheServers().size() == 1";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
- assertNotNull(bs);
- assertNotNull(bs.getAcceptor());
- assertNotNull(bs.getAcceptor().getCacheClientNotifier());
- final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return ccn.getClientProxies().size() > 0;
- }
- @Override
- public String description() {
- return "waiting for ccn.getClientProxies().size() > 0";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- wc = new WaitCriterion() {
- Iterator iter_prox;
- CacheClientProxy proxy;
-
- @Override
- public boolean done() {
- iter_prox = ccn.getClientProxies().iterator();
- if (iter_prox.hasNext()) {
- proxy = (CacheClientProxy) iter_prox.next();
- return proxy._messageDispatcher.isAlive();
- } else {
- return false;
- }
- }
-
- @Override
- public String description() {
- return "waiting for CacheClientProxy _messageDispatcher to be alive";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
- }
-
- public static void verifyDispatcherIsNotAlive() {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return cache.getCacheServers().size() == 1;
- }
- @Override
- public String description() {
- return "cache.getCacheServers().size() == 1";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
- assertNotNull(bs);
- assertNotNull(bs.getAcceptor());
- assertNotNull(bs.getAcceptor().getCacheClientNotifier());
- final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return ccn.getClientProxies().size() > 0;
- }
- @Override
- public String description() {
- return "waiting for ccn.getClientProxies().size() > 0";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- Iterator iter_prox = ccn.getClientProxies().iterator();
- if (iter_prox.hasNext()) {
- CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
- assertFalse("Dispatcher on secondary should not be alive", proxy._messageDispatcher.isAlive());
- }
- }
-
- public static void createEntriesK1andK2OnServer() {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
- if (!r1.containsKey(k1)) {
- r1.create(k1, server_k1);
- }
- if (!r1.containsKey(k2)) {
- r1.create(k2, server_k2);
- }
- assertEquals(r1.getEntry(k1).getValue(), server_k1);
- assertEquals(r1.getEntry(k2).getValue(), server_k2);
- }
-
- public static void createEntriesK1andK2() {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
- if (!r1.containsKey(k1)) {
- r1.create(k1, client_k1);
- }
- if (!r1.containsKey(k2)) {
- r1.create(k2, client_k2);
- }
- assertEquals(r1.getEntry(k1).getValue(), client_k1);
- assertEquals(r1.getEntry(k2).getValue(), client_k2);
- }
-
- public static void createServerEntriesK1andK2() {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
- if (!r1.containsKey(k1)) {
- r1.create(k1, server_k1);
- }
- if (!r1.containsKey(k2)) {
- r1.create(k2, server_k2);
- }
- assertEquals(r1.getEntry(k1).getValue(), server_k1);
- assertEquals(r1.getEntry(k2).getValue(), server_k2);
- }
-
- public static void registerK1AndK2() {
- Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
- r.registerInterest(list, InterestResultPolicy.KEYS_VALUES);
- }
-
- public static void reRegisterK1AndK2() {
- Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
- r.registerInterest(list);
- }
-
- public static void startServer() throws IOException {
- Cache c = CacheFactory.getAnyInstance();
- assertEquals("More than one BridgeServer", 1, c.getCacheServers().size());
- CacheServerImpl bs = (CacheServerImpl) c.getCacheServers().iterator().next();
- assertNotNull(bs);
- bs.start();
- }
-
- public static void stopServer() {
- assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
- CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
- assertNotNull(bs);
- bs.stop();
- }
-
- public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() {
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == 3;
- }
- @Override
- public String description() {
- return "connected server count never became 3";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- // close primaryEP
- getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
- List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
- assertNotNull(serverKeys);
- List resultKeys = (List) serverKeys.get(0);
- assertEquals(2, resultKeys.size());
- assertTrue(resultKeys.contains(k1));
- assertTrue(resultKeys.contains(k2));
- }
-
- public static void stopPrimaryAndUnregisterRegisterK1() {
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == 3;
- }
- @Override
- public String description() {
- return "connected server count never became 3";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- // close primaryEP
- getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
- List list = new ArrayList();
- list.add(k1);
- srp.unregisterInterest(list, InterestType.KEY, false, false);
- }
-
- public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() {
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == 3;
- }
- @Override
- public String description() {
- return "connected server count never became 3";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- // close primaryEP
- VM backup = getBackupVM();
- getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
- // close secondary
- backup.invoke(HAInterestBaseTest.class, "stopServer");
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
- List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
-
- assertNotNull(serverKeys);
- List resultKeys = (List) serverKeys.get(0);
- assertEquals(2, resultKeys.size());
- assertTrue(resultKeys.contains(k1));
- assertTrue(resultKeys.contains(k2));
- }
-
- /**
- * returns the secondary that was stopped
- */
- public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() {
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == 3;
- }
- @Override
- public String description() {
- return "Never got three connected servers";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- // close secondary EP
- VM result = getBackupVM();
- result.invoke(HAInterestBaseTest.class, "stopServer");
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
- List serverKeys = srp.registerInterest(list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
-
- assertNotNull(serverKeys);
- List resultKeys = (List) serverKeys.get(0);
- assertEquals(2, resultKeys.size());
- assertTrue(resultKeys.contains(k1));
- assertTrue(resultKeys.contains(k2));
- return result;
- }
-
- /**
- * returns the secondary that was stopped
- */
- public static VM stopSecondaryAndUNregisterK1() {
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return pool.getConnectedServerCount() == 3;
- }
- @Override
- public String description() {
- return "connected server count never became 3";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- // close secondary EP
- VM result = getBackupVM();
- result.invoke(HAInterestBaseTest.class, "stopServer");
- List list = new ArrayList();
- list.add(k1);
- srp.unregisterInterest(list, InterestType.KEY, false, false);
- return result;
- }
-
- public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() {
- ServerLocation primary = pool.getPrimary();
- ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0);
- LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r);
- ServerRegionProxy srp = new ServerRegionProxy(r);
- List list = new ArrayList();
- list.add(k1);
- list.add(k2);
-
- // Primary server
- List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
- assertNotNull(serverKeys1);
- // expect serverKeys in response from primary
- List resultKeys = (List) serverKeys1.get(0);
- assertEquals(2, resultKeys.size());
- assertTrue(resultKeys.contains(k1));
- assertTrue(resultKeys.contains(k2));
-
- // Secondary server
- List serverKeys2 = srp.registerInterestOn(secondary, list, InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
- // if the list is null then it is empty
- if (serverKeys2 != null) {
- // no serverKeys in response from secondary
- assertTrue(serverKeys2.isEmpty());
- }
- }
-
- public static void verifyInterestRegistration() {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return cache.getCacheServers().size() == 1;
- }
- @Override
- public String description() {
- return "waiting for cache.getCacheServers().size() == 1";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
- assertNotNull(bs);
- assertNotNull(bs.getAcceptor());
- assertNotNull(bs.getAcceptor().getCacheClientNotifier());
- final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return ccn.getClientProxies().size() > 0;
- }
- @Override
- public String description() {
- return "waiting for ccn.getClientProxies().size() > 0";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- Iterator iter_prox = ccn.getClientProxies().iterator();
-
- if (iter_prox.hasNext()) {
- final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME)
- .getKeysOfInterestFor(ccp.getProxyID());
- return keysMap != null && keysMap.size() == 2;
- }
- @Override
- public String description() {
- return "waiting for keys of interest to include 2 keys";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex].getProfile(Region.SEPARATOR + REGION_NAME)
- .getKeysOfInterestFor(ccp.getProxyID());
- assertNotNull(keysMap);
- assertEquals(2, keysMap.size());
- assertTrue(keysMap.contains(k1));
- assertTrue(keysMap.contains(k2));
- }
- }
-
- public static void verifyInterestUNRegistration() {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return cache.getCacheServers().size() == 1;
- }
- @Override
- public String description() {
- return "waiting for cache.getCacheServers().size() == 1";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
- assertNotNull(bs);
- assertNotNull(bs.getAcceptor());
- assertNotNull(bs.getAcceptor().getCacheClientNotifier());
- final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return ccn.getClientProxies().size() > 0;
- }
- @Override
- public String description() {
- return "waiting for ccn.getClientProxies().size() > 0";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- Iterator iter_prox = ccn.getClientProxies().iterator();
- if (iter_prox.hasNext()) {
- final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
-
- wc = new WaitCriterion() {
- @Override
- public boolean done() {
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME)
- .getKeysOfInterestFor(ccp.getProxyID());
- return keysMap != null;
- }
- @Override
- public String description() {
- return "waiting for keys of interest to not be null";
- }
- };
- DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
-
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME)
- .getKeysOfInterestFor(ccp.getProxyID());
- assertNotNull(keysMap);
- assertEquals(1, keysMap.size());
- assertFalse(keysMap.contains(k1));
- assertTrue(keysMap.contains(k2));
- }
- }
-
- private void createCache(Properties props) throws Exception {
- DistributedSystem ds = getSystem(props);
- assertNotNull(ds);
- ds.disconnect();
- ds = getSystem(props);
- cache = CacheFactory.create(ds);
- assertNotNull(cache);
- }
-
- public static void createClientPoolCache(String testName, String host) throws Exception {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- new HAInterestBaseTest("temp").createCache(props);
- CacheServerTestUtil.disableShufflingOfEndpoints();
- PoolImpl p;
- try {
- p = (PoolImpl) PoolManager.createFactory()
- .addServer(host, PORT1)
- .addServer(host, PORT2)
- .addServer(host, PORT3)
- .setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1)
- .setReadTimeout(1000)
- .setPingInterval(1000)
- // retryInterval should be more so that only registerInterste thread
- // will initiate failover
- // .setRetryInterval(20000)
- .create("HAInterestBaseTestPool");
- } finally {
- CacheServerTestUtil.enableShufflingOfEndpoints();
- }
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- factory.setConcurrencyChecksEnabled(true);
- factory.setPoolName(p.getName());
-
- cache.createRegion(REGION_NAME, factory.create());
- pool = p;
- conn = pool.acquireConnection();
- assertNotNull(conn);
- }
-
- public static void createClientPoolCacheWithSmallRetryInterval(String testName, String host) throws Exception {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- new HAInterestBaseTest("temp").createCache(props);
- CacheServerTestUtil.disableShufflingOfEndpoints();
- PoolImpl p;
- try {
- p = (PoolImpl) PoolManager.createFactory()
- .addServer(host, PORT1)
- .addServer(host, PORT2)
- .setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1)
- .setReadTimeout(1000)
- .setSocketBufferSize(32768)
- .setMinConnections(6)
- .setPingInterval(200)
- // .setRetryInterval(200)
- // retryAttempts 3
- .create("HAInterestBaseTestPool");
- } finally {
- CacheServerTestUtil.enableShufflingOfEndpoints();
- }
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- factory.setConcurrencyChecksEnabled(true);
- factory.setPoolName(p.getName());
-
- cache.createRegion(REGION_NAME, factory.create());
-
- pool = p;
- conn = pool.acquireConnection();
- assertNotNull(conn);
- }
-
- public static void createClientPoolCacheConnectionToSingleServer(String testName, String hostName) throws Exception {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- new HAInterestBaseTest("temp").createCache(props);
- PoolImpl p = (PoolImpl) PoolManager.createFactory()
- .addServer(hostName, PORT1)
- .setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1)
- .setReadTimeout(1000)
- // .setRetryInterval(20)
- .create("HAInterestBaseTestPool");
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- factory.setConcurrencyChecksEnabled(true);
- factory.setPoolName(p.getName());
-
- cache.createRegion(REGION_NAME, factory.create());
-
- pool = p;
- conn = pool.acquireConnection();
- assertNotNull(conn);
- }
-
- public static Integer createServerCache() throws Exception {
- new HAInterestBaseTest("temp").createCache(new Properties());
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setEnableBridgeConflation(true);
- factory.setMirrorType(MirrorType.KEYS_VALUES);
- factory.setConcurrencyChecksEnabled(true);
- cache.createRegion(REGION_NAME, factory.create());
-
- CacheServer server = cache.addCacheServer();
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- server.setPort(port);
- server.setMaximumTimeBetweenPings(180000);
- // ensures updates to be sent instead of invalidations
- server.setNotifyBySubscription(true);
- server.start();
- return new Integer(server.getPort());
- }
-
- public static Integer createServerCacheWithLocalRegion() throws Exception {
- new HAInterestBaseTest("temp").createCache(new Properties());
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- factory.setConcurrencyChecksEnabled(true);
- RegionAttributes attrs = factory.create();
- cache.createRegion(REGION_NAME, attrs);
-
- CacheServer server = cache.addCacheServer();
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- server.setPort(port);
- // ensures updates to be sent instead of invalidations
- server.setNotifyBySubscription(true);
- server.setMaximumTimeBetweenPings(180000);
- server.start();
- return new Integer(server.getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart1DUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart1DUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart1DUnitTest.java
index 482fca9..27779a6 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart1DUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart1DUnitTest.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.internal.cache.tier.sockets;
import dunit.VM;
@SuppressWarnings("serial")
-public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
+public class HAInterestPart1DUnitTest extends HAInterestTestCase {
public HAInterestPart1DUnitTest(String name) {
super(name);
@@ -31,14 +31,14 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testInterestRegistrationOnBothPrimaryAndSecondary() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
// register K1 and K2
registerK1AndK2();
- server1.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server2.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server3.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server1.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server2.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server3.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -48,9 +48,9 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testInterestRegistrationResponseOnBothPrimaryAndSecondary() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
// register interest and verify response
registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse();
}
@@ -62,15 +62,15 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testRERegistrationWillNotCreateDuplicateKeysOnServerInterstMaps() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
// register multiple times
reRegisterK1AndK2();
- server1.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server2.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server3.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server1.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server2.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server3.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -81,9 +81,9 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testPrimaryFailureInRegisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
// stop primary
VM oldPrimary = getPrimaryVM();
stopPrimaryAndRegisterK1AndK2AndVerifyResponse();
@@ -91,8 +91,8 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
verifyDeadAndLiveServers(1, 2);
// new primary
VM newPrimary = getPrimaryVM(oldPrimary);
- newPrimary.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
- newPrimary.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -102,17 +102,17 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testSecondaryFailureInRegisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
VM primary = getPrimaryVM();
stopSecondaryAndRegisterK1AndK2AndVerifyResponse();
verifyDeadAndLiveServers(1, 2);
// still primary
- primary.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
- primary.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ primary.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
+ primary.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -124,17 +124,17 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testBothPrimaryAndSecondaryFailureInRegisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
// stop server1 and server2
VM oldPrimary = getPrimaryVM();
stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse();
verifyDeadAndLiveServers(2, 1);
VM newPrimary = getPrimaryVM(oldPrimary);
- newPrimary.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
- newPrimary.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -148,17 +148,17 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
public void testProbablePrimaryFailureInRegisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
VM oldPrimary = getPrimaryVM();
stopPrimaryAndRegisterK1AndK2AndVerifyResponse();
verifyDeadAndLiveServers(1, 2);
VM newPrimary = getPrimaryVM(oldPrimary);
- newPrimary.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
- newPrimary.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -172,40 +172,40 @@ public class HAInterestPart1DUnitTest extends HAInterestBaseTest {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
registerK1AndK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
- server1.invoke(HAInterestBaseTest.class, "stopServer");
- server2.invoke(HAInterestBaseTest.class, "stopServer");
- server3.invoke(HAInterestBaseTest.class, "stopServer");
+ server1.invoke(HAInterestTestCase.class, "stopServer");
+ server2.invoke(HAInterestTestCase.class, "stopServer");
+ server3.invoke(HAInterestTestCase.class, "stopServer");
// All servers are dead at this point , no primary in the system.
verifyDeadAndLiveServers(3, 0);
// now start one of the servers
- server2.invoke(HAInterestBaseTest.class, "startServer");
+ server2.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(2, 1);
// verify that is it primary , and dispatcher is running
- server2.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
+ server2.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
// verify that interest is registered on this recovered EP
- server2.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server2.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
// now start one more server ; this should be now secondary
- server1.invoke(HAInterestBaseTest.class, "startServer");
+ server1.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(1, 2);
// verify that is it secondary , dispatcher should not be runnig
- server1.invoke(HAInterestBaseTest.class, "verifyDispatcherIsNotAlive");
+ server1.invoke(HAInterestTestCase.class, "verifyDispatcherIsNotAlive");
// verify that interest is registered on this recovered EP as well
- server1.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server1.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
// now start one more server ; this should be now secondary
- server3.invoke(HAInterestBaseTest.class, "startServer");
+ server3.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(0, 3);
// verify that is it secondary , dispatcher should not be runnig
- server3.invoke(HAInterestBaseTest.class, "verifyDispatcherIsNotAlive");
+ server3.invoke(HAInterestTestCase.class, "verifyDispatcherIsNotAlive");
// verify that interest is registered on this recovered EP as well
- server3.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server3.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ca6148aa/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart2DUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart2DUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart2DUnitTest.java
index eaa1ca1..31a2811 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart2DUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestPart2DUnitTest.java
@@ -24,7 +24,7 @@ import dunit.DistributedTestCase;
import dunit.VM;
@SuppressWarnings({"rawtypes", "serial"})
-public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
+public class HAInterestPart2DUnitTest extends HAInterestTestCase {
public HAInterestPart2DUnitTest(String name) {
super(name);
@@ -37,9 +37,9 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
public void testPrimaryFailureInUNregisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
registerK1AndK2();
@@ -49,11 +49,11 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
verifyDeadAndLiveServers(1, 2);
VM newPrimary = getPrimaryVM(oldPrimary);
- newPrimary.invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
// primary
- newPrimary.invoke(HAInterestBaseTest.class, "verifyInterestUNRegistration");
+ newPrimary.invoke(HAInterestTestCase.class, "verifyInterestUNRegistration");
// secondary
- getBackupVM().invoke(HAInterestBaseTest.class, "verifyInterestUNRegistration");
+ getBackupVM().invoke(HAInterestTestCase.class, "verifyInterestUNRegistration");
}
/**
@@ -63,18 +63,18 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
public void testSecondaryFailureInUNRegisterInterest() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
registerK1AndK2();
VM stoppedBackup = stopSecondaryAndUNregisterK1();
verifyDeadAndLiveServers(1, 2);
// still primary
- getPrimaryVM().invoke(HAInterestBaseTest.class, "verifyDispatcherIsAlive");
+ getPrimaryVM().invoke(HAInterestTestCase.class, "verifyDispatcherIsAlive");
// primary
- getPrimaryVM().invoke(HAInterestBaseTest.class, "verifyInterestUNRegistration");
+ getPrimaryVM().invoke(HAInterestTestCase.class, "verifyInterestUNRegistration");
// secondary
- getBackupVM(stoppedBackup).invoke(HAInterestBaseTest.class, "verifyInterestUNRegistration");
+ getBackupVM(stoppedBackup).invoke(HAInterestTestCase.class, "verifyInterestUNRegistration");
}
/**
@@ -85,11 +85,11 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
public void testDSMDetectsServerLiveJustBeforeInterestRegistration() throws Exception {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
VM backup = getBackupVM();
- backup.invoke(HAInterestBaseTest.class, "stopServer");
+ backup.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(1, 2);
setClientServerObserverForBeforeRegistration(backup);
try {
@@ -98,9 +98,9 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
} finally {
unSetClientServerObserverForRegistrationCallback();
}
- server1.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server2.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server3.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server1.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server2.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server3.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -112,12 +112,12 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
createEntriesK1andK2();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
VM backup = getBackupVM();
- backup.invoke(HAInterestBaseTest.class, "stopServer");
+ backup.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(1, 2);
setClientServerObserverForAfterRegistration(backup);
@@ -128,9 +128,9 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
unSetClientServerObserverForRegistrationCallback();
}
- server1.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server2.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
- server3.invoke(HAInterestBaseTest.class, "verifyInterestRegistration");
+ server1.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server2.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
+ server3.invoke(HAInterestTestCase.class, "verifyInterestRegistration");
}
/**
@@ -143,16 +143,16 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
public void testRefreshEntriesFromPrimaryWhenDSMDetectsServerLive() throws Exception {
addExpectedException(ServerConnectivityException.class.getName());
- PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
createClientPoolCacheConnectionToSingleServer(this.getName(), getServerHostName(server1.getHost()));
registerK1AndK2();
verifyRefreshedEntriesFromServer();
- server1.invoke(HAInterestBaseTest.class, "stopServer");
+ server1.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(1, 0);
- server1.invoke(HAInterestBaseTest.class, "putK1andK2");
- server1.invoke(HAInterestBaseTest.class, "startServer");
+ server1.invoke(HAInterestTestCase.class, "putK1andK2");
+ server1.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(0, 1);
final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
@@ -211,29 +211,29 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
* refreshes registered entries from the server, because it is secondary
*/
public void testGIIFromSecondaryWhenDSMDetectsServerLive() throws Exception {
- server1.invoke(HAInterestBaseTest.class, "closeCache");
- server2.invoke(HAInterestBaseTest.class, "closeCache");
- server3.invoke(HAInterestBaseTest.class, "closeCache");
+ server1.invoke(HAInterestTestCase.class, "closeCache");
+ server2.invoke(HAInterestTestCase.class, "closeCache");
+ server3.invoke(HAInterestTestCase.class, "closeCache");
- PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCacheWithLocalRegion")).intValue();
- PORT2 = ((Integer) server2.invoke(HAInterestBaseTest.class, "createServerCacheWithLocalRegion")).intValue();
- PORT3 = ((Integer) server3.invoke(HAInterestBaseTest.class, "createServerCacheWithLocalRegion")).intValue();
+ PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCacheWithLocalRegion")).intValue();
+ PORT2 = ((Integer) server2.invoke(HAInterestTestCase.class, "createServerCacheWithLocalRegion")).intValue();
+ PORT3 = ((Integer) server3.invoke(HAInterestTestCase.class, "createServerCacheWithLocalRegion")).intValue();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- server3.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ server3.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
createClientPoolCache(this.getName(), getServerHostName(server1.getHost()));
VM backup1 = getBackupVM();
VM backup2 = getBackupVM(backup1);
- backup1.invoke(HAInterestBaseTest.class, "stopServer");
- backup2.invoke(HAInterestBaseTest.class, "stopServer");
+ backup1.invoke(HAInterestTestCase.class, "stopServer");
+ backup2.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(2, 1);
registerK1AndK2();
verifyRefreshedEntriesFromServer();
- backup1.invoke(HAInterestBaseTest.class, "putK1andK2");
- backup1.invoke(HAInterestBaseTest.class, "startServer");
+ backup1.invoke(HAInterestTestCase.class, "putK1andK2");
+ backup1.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(1, 2);
verifyRefreshedEntriesFromServer();
}
@@ -246,19 +246,19 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
* @throws Exception
*/
public void testBug35945() throws Exception {
- PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
createClientPoolCacheConnectionToSingleServer(this.getName(), getServerHostName(server1.getHost()));
registerK1AndK2();
verifyRefreshedEntriesFromServer();
- server1.invoke(HAInterestBaseTest.class, "stopServer");
+ server1.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(1, 0);
// put on stopped server
- server1.invoke(HAInterestBaseTest.class, "putK1andK2");
+ server1.invoke(HAInterestTestCase.class, "putK1andK2");
// spawn a thread to put on server , which will acquire a lock on entry
setClientServerObserverForBeforeInterestRecovery();
- server1.invoke(HAInterestBaseTest.class, "startServer");
+ server1.invoke(HAInterestTestCase.class, "startServer");
verifyDeadAndLiveServers(0, 1);
waitForBeforeInterestRecoveryCallBack();
// verify updated value of k1 as a refreshEntriesFromServer
@@ -314,23 +314,23 @@ public class HAInterestPart2DUnitTest extends HAInterestBaseTest {
public void testInterestRecoveryFailure() throws Exception {
addExpectedException("Server unreachable");
- PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- server1.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
- PORT2 = ((Integer) server2.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
- server2.invoke(HAInterestBaseTest.class, "createEntriesK1andK2");
+ PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ server1.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
+ PORT2 = ((Integer) server2.invoke(HAInterestTestCase.class, "createServerCache")).intValue();
+ server2.invoke(HAInterestTestCase.class, "createEntriesK1andK2");
createClientPoolCacheWithSmallRetryInterval(this.getName(), getServerHostName(server1.getHost()));
registerK1AndK2();
verifyRefreshedEntriesFromServer();
VM backup = getBackupVM();
VM primary = getPrimaryVM();
- backup.invoke(HAInterestBaseTest.class, "stopServer");
- primary.invoke(HAInterestBaseTest.class, "stopServer");
+ backup.invoke(HAInterestTestCase.class, "stopServer");
+ primary.invoke(HAInterestTestCase.class, "stopServer");
verifyDeadAndLiveServers(2, 0);
- primary.invoke(HAInterestBaseTest.class, "putK1andK2");
+ primary.invoke(HAInterestTestCase.class, "putK1andK2");
setClientServerObserverForBeforeInterestRecoveryFailure();
- primary.invoke(HAInterestBaseTest.class, "startServer");
+ primary.invoke(HAInterestTestCase.class, "startServer");
waitForBeforeInterestRecoveryCallBack();
if (exceptionOccured) {
fail("The DSM could not ensure that server 1 is started & serevr 2 is stopped");