You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/14 17:37:19 UTC

[geode] branch feature/GEM-3093 updated: fix to let stillWaiting return false

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-3093
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEM-3093 by this push:
     new d375be0  fix to let stillWaiting return false
d375be0 is described below

commit d375be0dbdaf02f84415de898b71cf46162379f5
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Oct 14 10:36:13 2020 -0700

    fix to let stillWaiting return false
---
 .../ClusterDistributionManagerDUnitTest.java       | 26 ++++++++++++++++++++++
 .../geode/internal/cache/StateFlushOperation.java  |  3 +++
 2 files changed, 29 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
index a85d82f..68c5e81 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
@@ -231,6 +231,32 @@ public class ClusterDistributionManagerDUnitTest extends CacheTestCase {
     await().until(() -> listenerInvoked.get());
   }
 
+  @Test
+  public void shutdownMessageCausesStateFlushReplyProcessorToNoLongerWait() {
+    final AtomicBoolean listenerInvoked = new AtomicBoolean();
+    vm1.invoke("join the cluster", () -> getSystem().getDistributedMember()); // lead member
+    system = getSystem(); // non-lead member
+    // this membership listener will be invoked when the shutdown message is received
+    system.getDistributionManager().addMembershipListener(new MembershipListener() {
+      @Override
+      public void memberDeparted(DistributionManager distributionManager,
+          InternalDistributedMember id, boolean crashed) {
+        assertThat(crashed).isFalse();
+        listenerInvoked.set(Boolean.TRUE);
+      }
+    });
+    final InternalDistributedMember memberID = system.getDistributedMember();
+    locatorvm.invoke("send a shutdown message", () -> {
+      final DistributionManager distributionManager =
+          ((InternalDistributedSystem) Locator.getLocator().getDistributedSystem())
+              .getDistributionManager();
+      final ShutdownMessage shutdownMessage = new ShutdownMessage();
+      shutdownMessage.setRecipient(memberID);
+      shutdownMessage.setDistributionManagerId(distributionManager.getDistributionManagerId());
+      distributionManager.putOutgoing(shutdownMessage);
+    });
+    await().until(() -> listenerInvoked.get());
+  }
 
   /**
    * Tests that a severe-level alert is generated if a member does not respond with an ack quickly
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index a66803f..05ac60a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -773,6 +773,9 @@ public class StateFlushOperation {
     public void memberDeparted(DistributionManager distributionManager,
         final InternalDistributedMember id, final boolean crashed) {
       logger.info("GGG:StateFlushReplyProcesssor.memberDeparted:id=" + id);
+      if (id.equals(targetMember)) {
+        targetMemberHasLeft = true;
+      }
       super.memberDeparted(distributionManager, id, crashed);
     }