You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/14 22:16:20 UTC
[geode] branch support/1.12 updated: GEODE-8608: StateFlush could
hang when the target member is shutdown (#5624)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 21d77a8 GEODE-8608: StateFlush could hang when the target member is shutdown (#5624)
21d77a8 is described below
commit 21d77a84452f23ef3f2b6c5e4fe7e67f337c3820
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Wed Oct 14 14:40:06 2020 -0700
GEODE-8608: StateFlush could hang when the target member is shutdown (#5624)
Co-authored-by: Darrel Schneider <da...@vmware.com>
Co-authored-by: Anil <ag...@pivotal.io>
Co-authored-by: Bill Burcham <bi...@gmail.com>
(cherry picked from commit 4d5ed9ddb25a39677f4077c8f2acd964d8480986)
---
.../ClusterDistributionManagerDUnitTest.java | 30 ++++++++++++++++++++++
.../geode/internal/cache/StateFlushOperation.java | 11 +++++++-
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
index a85d82f..46029b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
@@ -32,6 +32,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.File;
import java.net.InetAddress;
+import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -59,12 +60,14 @@ import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.internal.cache.StateFlushOperation;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableRunnable;
@@ -231,6 +234,33 @@ public class ClusterDistributionManagerDUnitTest extends CacheTestCase {
await().until(() -> listenerInvoked.get());
}
+ @Test
+ public void shutdownMessageCausesTargetMemberToLeaveStateFlushReplyProcessor() {
+ vm1.invoke("join the cluster", () -> getSystem().getDistributedMember()); // lead member
+ system = getSystem(); // non-lead member
+ DistributedMember targetId = locatorvm.invoke(() -> {
+ return Locator.getLocator().getDistributedSystem().getDistributedMember();
+ });
+
+ StateFlushOperation.StateFlushReplyProcessor stateFlushReplyProcessor =
+ new StateFlushOperation.StateFlushReplyProcessor(getSystem().getDistributionManager(),
+ new HashSet(), targetId);
+ system.getDistributionManager().addMembershipListener(stateFlushReplyProcessor);
+ final InternalDistributedMember memberID = system.getDistributedMember();
+
+ locatorvm.invoke("send a shutdown message", () -> {
+ final DistributionManager distributionManager =
+ ((InternalDistributedSystem) Locator.getLocator().getDistributedSystem())
+ .getDistributionManager();
+ final ShutdownMessage shutdownMessage = new ShutdownMessage();
+ shutdownMessage.setRecipient(memberID);
+ shutdownMessage.setDistributionManagerId(distributionManager.getDistributionManagerId());
+ distributionManager.putOutgoing(shutdownMessage);
+ });
+
+ await().untilAsserted(
+ () -> assertThat(stateFlushReplyProcessor.getTargetMemberHasLeft()).isTrue());
+ }
/**
* Tests that a severe-level alert is generated if a member does not respond with an ack quickly
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index 5f09010..303d6a1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedMember;
@@ -750,7 +751,7 @@ public class StateFlushOperation {
int originalCount;
/** whether the target member has left the distributed system */
- boolean targetMemberHasLeft;
+ volatile boolean targetMemberHasLeft;
public StateFlushReplyProcessor(DistributionManager manager, Set initMembers,
DistributedMember target) {
@@ -772,6 +773,9 @@ public class StateFlushOperation {
@Override
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, final boolean crashed) {
+ if (id.equals(targetMember)) {
+ targetMemberHasLeft = true;
+ }
super.memberDeparted(distributionManager, id, crashed);
}
@@ -783,6 +787,11 @@ public class StateFlushOperation {
}
}
+ @VisibleForTesting
+ public boolean getTargetMemberHasLeft() {
+ return targetMemberHasLeft;
+ }
+
@Override
protected boolean stillWaiting() {
targetMemberHasLeft =