You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/14 22:25:25 UTC

[geode] branch support/1.13 updated: GEODE-8608: StateFlush could hang when the target member is shutdown (#5624)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new b6ae64f  GEODE-8608: StateFlush could hang when the target member is shutdown (#5624)
b6ae64f is described below

commit b6ae64f949d0056487d7d96e0f50a21f3441fa43
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Wed Oct 14 14:40:06 2020 -0700

    GEODE-8608: StateFlush could hang when the target member is shutdown (#5624)
    
    
        Co-authored-by: Darrel Schneider <da...@vmware.com>
        Co-authored-by: Anil <ag...@pivotal.io>
        Co-authored-by: Bill Burcham <bi...@gmail.com>
    
    (cherry picked from commit 4d5ed9ddb25a39677f4077c8f2acd964d8480986)
---
 .../ClusterDistributionManagerDUnitTest.java       | 30 ++++++++++++++++++++++
 .../geode/internal/cache/StateFlushOperation.java  | 11 +++++++-
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
index a85d82f..46029b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
@@ -32,6 +32,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -59,12 +60,14 @@ import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
 import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
 import org.apache.geode.distributed.internal.membership.api.MembershipView;
 import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.internal.cache.StateFlushOperation;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -231,6 +234,33 @@ public class ClusterDistributionManagerDUnitTest extends CacheTestCase {
     await().until(() -> listenerInvoked.get());
   }
 
+  @Test
+  public void shutdownMessageCausesTargetMemberToLeaveStateFlushReplyProcessor() {
+    vm1.invoke("join the cluster", () -> getSystem().getDistributedMember()); // lead member
+    system = getSystem(); // non-lead member
+    DistributedMember targetId = locatorvm.invoke(() -> {
+      return Locator.getLocator().getDistributedSystem().getDistributedMember();
+    });
+
+    StateFlushOperation.StateFlushReplyProcessor stateFlushReplyProcessor =
+        new StateFlushOperation.StateFlushReplyProcessor(getSystem().getDistributionManager(),
+            new HashSet(), targetId);
+    system.getDistributionManager().addMembershipListener(stateFlushReplyProcessor);
+    final InternalDistributedMember memberID = system.getDistributedMember();
+
+    locatorvm.invoke("send a shutdown message", () -> {
+      final DistributionManager distributionManager =
+          ((InternalDistributedSystem) Locator.getLocator().getDistributedSystem())
+              .getDistributionManager();
+      final ShutdownMessage shutdownMessage = new ShutdownMessage();
+      shutdownMessage.setRecipient(memberID);
+      shutdownMessage.setDistributionManagerId(distributionManager.getDistributionManagerId());
+      distributionManager.putOutgoing(shutdownMessage);
+    });
+
+    await().untilAsserted(
+        () -> assertThat(stateFlushReplyProcessor.getTargetMemberHasLeft()).isTrue());
+  }
 
   /**
    * Tests that a severe-level alert is generated if a member does not respond with an ack quickly
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index 5f09010..303d6a1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.distributed.DistributedMember;
@@ -750,7 +751,7 @@ public class StateFlushOperation {
     int originalCount;
 
     /** whether the target member has left the distributed system */
-    boolean targetMemberHasLeft;
+    volatile boolean targetMemberHasLeft;
 
     public StateFlushReplyProcessor(DistributionManager manager, Set initMembers,
         DistributedMember target) {
@@ -772,6 +773,9 @@ public class StateFlushOperation {
     @Override
     public void memberDeparted(DistributionManager distributionManager,
         final InternalDistributedMember id, final boolean crashed) {
+      if (id.equals(targetMember)) {
+        targetMemberHasLeft = true;
+      }
       super.memberDeparted(distributionManager, id, crashed);
     }
 
@@ -783,6 +787,11 @@ public class StateFlushOperation {
       }
     }
 
+    @VisibleForTesting
+    public boolean getTargetMemberHasLeft() {
+      return targetMemberHasLeft;
+    }
+
     @Override
     protected boolean stillWaiting() {
       targetMemberHasLeft =