You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/04/07 18:39:55 UTC
geode git commit: GEODE-2757: Do not process netsearch reply from a
departed node that membership listener already detected.
Repository: geode
Updated Branches:
refs/heads/develop 799548ee4 -> d497d63af
GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a
Branch: refs/heads/develop
Commit: d497d63af422b3b98c480698a9470812539f8a83
Parents: 799548e
Author: eshu <es...@pivotal.io>
Authored: Fri Apr 7 11:37:35 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Apr 7 11:37:35 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/DistributedRegion.java | 2 +-
.../cache/SearchLoadAndWriteProcessor.java | 61 +++++++----
.../cache/SearchLoadAndWriteProcessorTest.java | 102 ++++++++++++++++++-
3 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index fa02574..c12a652 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return this.distAdvisor;
}
- public final CacheDistributionAdvisor getCacheDistributionAdvisor() {
+ public CacheDistributionAdvisor getCacheDistributionAdvisor() {
return this.distAdvisor;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
index 3d969f9..2a10792 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
@@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue();
- private InternalDistributedMember selectedNode;
+ private volatile InternalDistributedMember selectedNode;
private boolean selectedNodeDead = false;
private int timeout;
private boolean netSearchDone = false;
@@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
private final Object membersLock = new Object();
+ private ArrayList<InternalDistributedMember> departedMembers;
+
private Lock lock = null; // if non-null, then needs to be unlocked in release
static final int NETSEARCH = 0;
@@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
}
synchronized (this) {
if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) {
+ if (departedMembers == null) {
+ departedMembers = new ArrayList<InternalDistributedMember>();
+ }
+ departedMembers.add(id);
selectedNode = null;
selectedNodeDead = true;
computeRemainingTimeout();
@@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
notifyAll(); // signal the waiter; we are not done; but we need the waiter to call
// sendNetSearchRequest
}
- if (responseQueue != null)
+ if (responseQueue != null) {
responseQueue.remove(id);
+ }
checkIfDone();
}
}
@@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/************** Package Methods **********************/
+ InternalDistributedMember getSelectedNode() {
+ return this.selectedNode;
+ }
+
/************** Private Methods **********************/
/**
* Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region,
@@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
synchronized (this.pendingResponders) {
this.pendingResponders.clear();
}
- this.requestInProgress = true;
- this.remoteGetInProgress = true;
+
synchronized (this) {
+ this.requestInProgress = true;
+ this.remoteGetInProgress = true;
setSelectedNode(replicate);
this.lastNotifySpot = 0;
- }
- sendValueRequest(replicate);
- waitForObject2(this.remainingTimeout);
- if (this.authorative) {
- if (this.result != null) {
- this.netSearch = true;
+
+ sendValueRequest(replicate);
+ waitForObject2(this.remainingTimeout);
+
+ if (this.authorative) {
+ if (this.result != null) {
+ this.netSearch = true;
+ }
+ return;
+ } else {
+ // clear anything that might have been set by our query.
+ this.selectedNode = null;
+ this.selectedNodeDead = false;
+ this.lastNotifySpot = 0;
+ this.result = null;
}
- return;
- } else {
- // clear anything that might have been set by our query.
- this.selectedNode = null;
- this.selectedNodeDead = false;
- this.lastNotifySpot = 0;
- this.result = null;
}
}
synchronized (membersLock) {
@@ -1055,8 +1069,15 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
@SuppressWarnings("hiding")
protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime,
- boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag) {
+ boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag,
+ InternalDistributedMember responder) {
final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (departedMembers != null && departedMembers.contains(responder)) {
+ if (isDebugEnabled) {
+ logger.debug("ignore the reply received from a departed member");
+ }
+ return;
+ }
if (this.requestInProgress) {
if (requestorTimedOut) {
@@ -1163,7 +1184,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException {
if (this.requestInProgress) {
try {
- final DM dm = this.region.cache.getDistributedSystem().getDistributionManager();
+ final DM dm = this.region.getCache().getDistributedSystem().getDistributionManager();
long waitTimeMs = timeoutMs;
final long endTime = System.currentTimeMillis() + waitTimeMs;
for (;;) {
@@ -2018,7 +2039,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
this.versionTag.replaceNullIDs(getSender());
}
processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized,
- this.requestorTimedOut, this.authoritative, this.versionTag);
+ this.requestorTimedOut, this.authoritative, this.versionTag, getSender());
}
public int getDSFID() {
http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
index bfe78b0..91ac16b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -14,15 +14,29 @@
*/
package org.apache.geode.internal.cache;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
@Category(UnitTest.class)
public class SearchLoadAndWriteProcessorTest {
@@ -62,4 +76,90 @@ public class SearchLoadAndWriteProcessorTest {
}
}
+ InternalDistributedMember departedMember;
+
+ @Test
+ public void verifyNoProcessingReplyFromADepartedMember() {
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ DistributedRegion lr = mock(DistributedRegion.class);
+ RegionAttributes attrs = mock(RegionAttributes.class);
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ InternalDistributedSystem ds = mock(InternalDistributedSystem.class);
+ DM dm = mock(DM.class);
+ CacheDistributionAdvisor advisor = mock(CacheDistributionAdvisor.class);
+ CachePerfStats stats = mock(CachePerfStats.class);
+ ExpirationAttributes expirationAttrs = mock(ExpirationAttributes.class);
+ InternalDistributedMember m1 = mock(InternalDistributedMember.class);
+ InternalDistributedMember m2 = mock(InternalDistributedMember.class);
+ Set<InternalDistributedMember> replicates = new HashSet<InternalDistributedMember>();;
+ replicates.add(m1);
+ replicates.add(m2);
+
+ when(lr.getAttributes()).thenReturn(attrs);
+ when(lr.getSystem()).thenReturn(ds);
+ when(lr.getCache()).thenReturn(cache);
+ when(lr.getCacheDistributionAdvisor()).thenReturn(advisor);
+ when(lr.getDistributionManager()).thenReturn(dm);
+ when(lr.getCachePerfStats()).thenReturn(stats);
+ when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+ when(lr.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+ when(cache.getDistributedSystem()).thenReturn(ds);
+ when(cache.getSearchTimeout()).thenReturn(30);
+ when(attrs.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+ when(attrs.getDataPolicy()).thenReturn(DataPolicy.EMPTY);
+ when(attrs.getEntryTimeToLive()).thenReturn(expirationAttrs);
+ when(attrs.getEntryIdleTimeout()).thenReturn(expirationAttrs);
+ when(advisor.adviseInitializedReplicates()).thenReturn(replicates);
+
+ Object key = "k1";
+ byte[] v1 = "v1".getBytes();
+ byte[] v2 = "v2".getBytes();
+ EntryEventImpl event = EntryEventImpl.create(lr, Operation.GET, key, null, null, false, null);
+
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+ .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> processor.getSelectedNode() != null);
+ departedMember = processor.getSelectedNode();
+ // Simulate member departed event
+ processor.memberDeparted(departedMember, true);
+ }
+ });
+ t1.start();
+
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+ .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> departedMember != null && processor.getSelectedNode() != null
+ && departedMember != processor.getSelectedNode());
+
+ // Handle search result from the departed member
+ processor.incomingNetSearchReply(v1, System.currentTimeMillis(), false, false, true,
+ mock(VersionTag.class), departedMember);
+ }
+ });
+ t2.start();
+
+ Thread t3 = new Thread(new Runnable() {
+ public void run() {
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+ .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> departedMember != null && processor.getSelectedNode() != null
+ && departedMember != processor.getSelectedNode());
+ // Handle search result from a new member
+ processor.incomingNetSearchReply(v2, System.currentTimeMillis(), false, false, true,
+ mock(VersionTag.class), processor.getSelectedNode());
+ }
+ });
+ t3.start();
+
+ processor.initialize(lr, key, null);
+ processor.doSearchAndLoad(event, null, null);
+
+ assertTrue(Arrays.equals((byte[]) event.getNewValue(), v2));
+ }
+
}