You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/12/02 17:36:24 UTC

incubator-geode git commit: GEODE-1978: Waiting for the queue to drain in testReplicatedSerialProp...

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 27f491af1 -> 6aced1947


GEODE-1978: Waiting for the queue to drain in testReplicatedSerialProp...

Waiting to the queue to drain in this test. The test had an assertion
that the queue was not empty. But really, in the background the
dispatcher was draining the queue as fast as it good. The actually
expected behavior is that the queue will become empty if the remote side
destroys a region.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6aced194
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6aced194
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6aced194

Branch: refs/heads/develop
Commit: 6aced1947590aa1829b7c9a65dbb11683544065b
Parents: 27f491a
Author: Dan Smith <up...@apache.org>
Authored: Thu Dec 1 17:14:00 2016 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Fri Dec 2 09:30:44 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/wan/WANTestBase.java   | 17 ++++++----------
 .../ConcurrentWANPropagation_1_DUnitTest.java   | 21 ++++++++++----------
 2 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6aced194/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 6351933..fd1d4c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3317,23 +3317,18 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     }
   }
 
-  public static void verifyRegionQueueNotEmptyForConcurrentSender(String senderId) {
+  public static void waitForConcurrentSerialSenderQueueToDrain(String senderId) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender =
+        senders.stream().filter(s -> s.getId().equals(senderId)).findFirst().get();
 
-    if (!sender.isParallel()) {
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> {
       Set<RegionQueue> queues =
           ((AbstractGatewaySender) sender).getQueuesForConcurrentSerialGatewaySender();
       for (RegionQueue q : queues) {
-        assertTrue(q.size() > 0);
+        assertEquals(0, q.size());
       }
-    }
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6aced194/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java
index fd9a9c9..ba8d349 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java
@@ -337,10 +337,10 @@ public class ConcurrentWANPropagation_1_DUnitTest extends WANTestBase {
         () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap()));
 
     // senders are created on local site
-    vm4.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, false, 100, 500, false, false,
-        null, true, 5, OrderPolicy.KEY));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, false, 100, 500, false, false,
-        null, true, 5, OrderPolicy.KEY));
+    vm4.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, false, 100, 10, false, false, null,
+        true, 5, OrderPolicy.KEY));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, false, 100, 10, false, false, null,
+        true, 5, OrderPolicy.KEY));
 
     // start the senders on local site
     startSenderInVMs("ln", vm4, vm5);
@@ -350,7 +350,7 @@ public class ConcurrentWANPropagation_1_DUnitTest extends WANTestBase {
 
     // start puts in RR_1 in another thread
     AsyncInvocation inv1 =
-        vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR_1", 10000));
+        vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR_1", 100));
     // destroy RR_1 in remote site
     vm2.invoke(() -> WANTestBase.destroyRegion(getTestMethodName() + "_RR_1"));
 
@@ -362,12 +362,11 @@ public class ConcurrentWANPropagation_1_DUnitTest extends WANTestBase {
     }
 
     // verify that all is well in local site. All the events should be present in local region
-    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 10000));
-    // assuming some events might have been dispatched before the remote region was destroyed,
-    // sender's region queue will have events less than 1000 but the queue will not be empty.
-    // NOTE: this much verification might be sufficient in DUnit. Hydra will take care of
-    // more in depth validations.
-    vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmptyForConcurrentSender("ln"));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 100));
+
+    // Wait for the queue to drain. The queue will drain because when the region is destroyed,
+    // the failed batches will be logged and discarded
+    vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln"));
   }
 
   /**