You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/04/07 18:39:55 UTC

geode git commit: GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.

Repository: geode
Updated Branches:
  refs/heads/develop 799548ee4 -> d497d63af


GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a

Branch: refs/heads/develop
Commit: d497d63af422b3b98c480698a9470812539f8a83
Parents: 799548e
Author: eshu <es...@pivotal.io>
Authored: Fri Apr 7 11:37:35 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Apr 7 11:37:35 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/DistributedRegion.java |   2 +-
 .../cache/SearchLoadAndWriteProcessor.java      |  61 +++++++----
 .../cache/SearchLoadAndWriteProcessorTest.java  | 102 ++++++++++++++++++-
 3 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index fa02574..c12a652 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     return this.distAdvisor;
   }
 
-  public final CacheDistributionAdvisor getCacheDistributionAdvisor() {
+  public CacheDistributionAdvisor getCacheDistributionAdvisor() {
     return this.distAdvisor;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
index 3d969f9..2a10792 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
@@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
       Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue();
 
 
-  private InternalDistributedMember selectedNode;
+  private volatile InternalDistributedMember selectedNode;
   private boolean selectedNodeDead = false;
   private int timeout;
   private boolean netSearchDone = false;
@@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   private final Object membersLock = new Object();
 
+  private ArrayList<InternalDistributedMember> departedMembers;
+
   private Lock lock = null; // if non-null, then needs to be unlocked in release
 
   static final int NETSEARCH = 0;
@@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
     }
     synchronized (this) {
       if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) {
+        if (departedMembers == null) {
+          departedMembers = new ArrayList<InternalDistributedMember>();
+        }
+        departedMembers.add(id);
         selectedNode = null;
         selectedNodeDead = true;
         computeRemainingTimeout();
@@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         notifyAll(); // signal the waiter; we are not done; but we need the waiter to call
                      // sendNetSearchRequest
       }
-      if (responseQueue != null)
+      if (responseQueue != null) {
         responseQueue.remove(id);
+      }
       checkIfDone();
     }
   }
@@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   /************** Package Methods **********************/
 
+  InternalDistributedMember getSelectedNode() {
+    return this.selectedNode;
+  }
+
   /************** Private Methods **********************/
   /**
    * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region,
@@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         synchronized (this.pendingResponders) {
           this.pendingResponders.clear();
         }
-        this.requestInProgress = true;
-        this.remoteGetInProgress = true;
+
         synchronized (this) {
+          this.requestInProgress = true;
+          this.remoteGetInProgress = true;
           setSelectedNode(replicate);
           this.lastNotifySpot = 0;
-        }
-        sendValueRequest(replicate);
-        waitForObject2(this.remainingTimeout);
-        if (this.authorative) {
-          if (this.result != null) {
-            this.netSearch = true;
+
+          sendValueRequest(replicate);
+          waitForObject2(this.remainingTimeout);
+
+          if (this.authorative) {
+            if (this.result != null) {
+              this.netSearch = true;
+            }
+            return;
+          } else {
+            // clear anything that might have been set by our query.
+            this.selectedNode = null;
+            this.selectedNodeDead = false;
+            this.lastNotifySpot = 0;
+            this.result = null;
           }
-          return;
-        } else {
-          // clear anything that might have been set by our query.
-          this.selectedNode = null;
-          this.selectedNodeDead = false;
-          this.lastNotifySpot = 0;
-          this.result = null;
         }
       }
       synchronized (membersLock) {
@@ -1055,8 +1069,15 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   @SuppressWarnings("hiding")
   protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime,
-      boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag) {
+      boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag,
+      InternalDistributedMember responder) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (departedMembers != null && departedMembers.contains(responder)) {
+      if (isDebugEnabled) {
+        logger.debug("ignore the reply received from a departed member");
+      }
+      return;
+    }
 
     if (this.requestInProgress) {
       if (requestorTimedOut) {
@@ -1163,7 +1184,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
   private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException {
     if (this.requestInProgress) {
       try {
-        final DM dm = this.region.cache.getDistributedSystem().getDistributionManager();
+        final DM dm = this.region.getCache().getDistributedSystem().getDistributionManager();
         long waitTimeMs = timeoutMs;
         final long endTime = System.currentTimeMillis() + waitTimeMs;
         for (;;) {
@@ -2018,7 +2039,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         this.versionTag.replaceNullIDs(getSender());
       }
       processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized,
-          this.requestorTimedOut, this.authoritative, this.versionTag);
+          this.requestorTimedOut, this.authoritative, this.versionTag, getSender());
     }
 
     public int getDSFID() {

http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
index bfe78b0..91ac16b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -14,15 +14,29 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 
 @Category(UnitTest.class)
 public class SearchLoadAndWriteProcessorTest {
@@ -62,4 +76,90 @@ public class SearchLoadAndWriteProcessorTest {
     }
   }
 
+  InternalDistributedMember departedMember;
+
+  @Test
+  public void verifyNoProcessingReplyFromADepartedMember() {
+    SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+    DistributedRegion lr = mock(DistributedRegion.class);
+    RegionAttributes attrs = mock(RegionAttributes.class);
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    InternalDistributedSystem ds = mock(InternalDistributedSystem.class);
+    DM dm = mock(DM.class);
+    CacheDistributionAdvisor advisor = mock(CacheDistributionAdvisor.class);
+    CachePerfStats stats = mock(CachePerfStats.class);
+    ExpirationAttributes expirationAttrs = mock(ExpirationAttributes.class);
+    InternalDistributedMember m1 = mock(InternalDistributedMember.class);
+    InternalDistributedMember m2 = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> replicates = new HashSet<InternalDistributedMember>();;
+    replicates.add(m1);
+    replicates.add(m2);
+
+    when(lr.getAttributes()).thenReturn(attrs);
+    when(lr.getSystem()).thenReturn(ds);
+    when(lr.getCache()).thenReturn(cache);
+    when(lr.getCacheDistributionAdvisor()).thenReturn(advisor);
+    when(lr.getDistributionManager()).thenReturn(dm);
+    when(lr.getCachePerfStats()).thenReturn(stats);
+    when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    when(lr.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+    when(cache.getDistributedSystem()).thenReturn(ds);
+    when(cache.getSearchTimeout()).thenReturn(30);
+    when(attrs.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    when(attrs.getDataPolicy()).thenReturn(DataPolicy.EMPTY);
+    when(attrs.getEntryTimeToLive()).thenReturn(expirationAttrs);
+    when(attrs.getEntryIdleTimeout()).thenReturn(expirationAttrs);
+    when(advisor.adviseInitializedReplicates()).thenReturn(replicates);
+
+    Object key = "k1";
+    byte[] v1 = "v1".getBytes();
+    byte[] v2 = "v2".getBytes();
+    EntryEventImpl event = EntryEventImpl.create(lr, Operation.GET, key, null, null, false, null);
+
+
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> processor.getSelectedNode() != null);
+        departedMember = processor.getSelectedNode();
+        // Simulate member departed event
+        processor.memberDeparted(departedMember, true);
+      }
+    });
+    t1.start();
+
+    Thread t2 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> departedMember != null && processor.getSelectedNode() != null
+                && departedMember != processor.getSelectedNode());
+
+        // Handle search result from the departed member
+        processor.incomingNetSearchReply(v1, System.currentTimeMillis(), false, false, true,
+            mock(VersionTag.class), departedMember);
+      }
+    });
+    t2.start();
+
+    Thread t3 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> departedMember != null && processor.getSelectedNode() != null
+                && departedMember != processor.getSelectedNode());
+        // Handle search result from a new member
+        processor.incomingNetSearchReply(v2, System.currentTimeMillis(), false, false, true,
+            mock(VersionTag.class), processor.getSelectedNode());
+      }
+    });
+    t3.start();
+
+    processor.initialize(lr, key, null);
+    processor.doSearchAndLoad(event, null, null);
+
+    assertTrue(Arrays.equals((byte[]) event.getNewValue(), v2));
+  }
+
 }