You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2022/01/07 19:22:34 UTC

[GitHub] [geode] kirklund commented on a change in pull request #7144: GEODE-9853: get all members hosting bucket

kirklund commented on a change in pull request #7144:
URL: https://github.com/apache/geode/pull/7144#discussion_r780465057



##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
##########
@@ -1185,6 +1188,84 @@ public void testParallelGWSenderUpdateFiltersWhilePuttingOnOneDispatcThread() th
 
   }
 
+  /**
+   * Put entries in region after gateway sender is stopped. Count number of PQRM messages sent.
+   */
+  @Test
+  public void testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
+
+    // make sure all the senders are running before doing any puts
+    waitForSendersRunning();
+
+    // FIRST RUN: now, the senders are started. So, start the puts
+    vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100));
+
+    vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100));
+
+    stopSenders();
+
+    waitForAllSendersNotRunning();
+
+    vm4.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm5.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm6.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm7.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });

Review comment:
       You'll need to unset DistributionMessageObserver in an `@After` teardown method:
   ```
   @After
   public void tearDown() {
     for (VM vm : asList(vm0, vm1, vm2, vm3)) {
       vm.invoke(() -> {
         DistributionMessageObserver.setInstance(null);
       }
     }
   }
   ```

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
##########
@@ -1185,6 +1188,84 @@ public void testParallelGWSenderUpdateFiltersWhilePuttingOnOneDispatcThread() th
 
   }
 
+  /**
+   * Put entries in region after gateway sender is stopped. Count number of PQRM messages sent.
+   */
+  @Test
+  public void testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
+
+    // make sure all the senders are running before doing any puts
+    waitForSendersRunning();
+
+    // FIRST RUN: now, the senders are started. So, start the puts
+    vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100));
+
+    vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100));
+
+    stopSenders();
+
+    waitForAllSendersNotRunning();
+
+    vm4.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm5.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm6.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm7.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+
+    // SECOND RUN: keep one thread doing puts to the region
+    vm4.invoke(() -> doPutsFrom(getUniqueName() + "_PR", 100, 200));
+
+    vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100));
+
+    int cntPQRM1 = vm4.invoke(() -> {

Review comment:
       Can you please rename these variables from `cntPQRM1` to something more descriptive and preferably without abbreviations? Maybe `parallelQueueRemovalMessageCountInVm4`.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
##########
@@ -1514,10 +1595,34 @@ private void waitForSendersRunning() {
     vm7.invoke(() -> waitForSenderRunningState("ln"));
   }
 
+  private void waitForAllSendersNotRunning() {
+    vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+  }
+
   private void validateParallelSenderQueueAllBucketsDrained() {
     vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
   }
+
+  private static class CountSentPQRMObserver extends DistributionMessageObserver {
+    private int numberOfSentPQRM = 0;

Review comment:
       I'm not sure if this needs to be thread safe or not, but you should use `AtomicInteger` just to be safe:
   ```
   private final AtomicInteger numberOfSentPQRM = new AtomicInteger();
   ```
   ```
   if (message instanceof ParallelQueueRemovalMessage) {
     numberOfSentPQRM.addAndGet(message.getRecipients().size();
   }
   ```

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1201,13 +1201,14 @@ protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key)
 
   public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, int bucketId,
       Object key) {
-    final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>();
-    Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
-    temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
-    addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
     Set<InternalDistributedMember> recipients =
-        removalThread.getAllRecipients(sender.getCache(), temp);
+        removalThread.getAllRecipientsForEvent(sender.getCache(), prQ.getFullPath(), bucketId);
+
     if (!recipients.isEmpty()) {
+      final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>();

Review comment:
       Can `temp` be declared as `Map<String, Map<Integer, List>>` instead of the impl?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1981,6 +1982,36 @@ public void run() {
       return recipients;
     }
 
+
+    private Set<InternalDistributedMember> getAllRecipientsForEvent(InternalCache cache,
+        String partitionedRegionName, int bucketId) {
+      Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>();
+      PartitionedRegion partitionedRegion =
+          (PartitionedRegion) cache.getRegion(partitionedRegionName);
+      if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != null) {
+        final String bucketFullPath =
+            SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR
+                + partitionedRegion.getBucketName(bucketId);
+        AbstractBucketRegionQueue brq =

Review comment:
       How about `bucketRegionQueue` instead of `brq`?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1201,13 +1201,14 @@ protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key)
 
   public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, int bucketId,
       Object key) {
-    final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>();
-    Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
-    temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
-    addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
     Set<InternalDistributedMember> recipients =
-        removalThread.getAllRecipients(sender.getCache(), temp);
+        removalThread.getAllRecipientsForEvent(sender.getCache(), prQ.getFullPath(), bucketId);
+
     if (!recipients.isEmpty()) {
+      final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>();
+      Map bucketIdToDispatchedKeys = new ConcurrentHashMap();

Review comment:
       Try to make sure all collections are typed `Map<something, something>`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org