You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/01/29 02:16:41 UTC

[24/52] [abbrv] incubator-geode git commit: GEODE-836: multicast state flush is broken

GEODE-836: multicast state flush is broken

The Digest from the multicast NAKACK2 protocol is immutable and needs
to be fetched each time it is checked in order to get an up-to-date
digest.  I also added warning and timeout for the method similar to
what's in DistributionAdvisor.waitForCurrentOperations(long) that
guarantee that the wait will terminate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ed17d4cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ed17d4cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ed17d4cd

Branch: refs/heads/feature/GEODE-831
Commit: ed17d4cd50595b5a611b4a4068a1e2b895a06ff6
Parents: 2609946
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Jan 26 08:53:40 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Jan 26 08:56:47 2016 -0800

----------------------------------------------------------------------
 .../gms/messenger/JGroupsMessenger.java         | 35 ++++++++++++++++----
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  2 +-
 .../messenger/JGroupsMessengerJUnitTest.java    | 26 +++++++++++----
 3 files changed, 48 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ccff687..be2c405 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -533,16 +533,37 @@ public class JGroupsMessenger implements Messenger {
    */
   protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long seqno)
     throws InterruptedException {
+    long timeout = services.getConfig().getDistributionConfig().getAckWaitThreshold() * 1000L;
+    long startTime = System.currentTimeMillis();
+    long warnTime = startTime + timeout;
+    long quitTime = warnTime + timeout - 1000L;
+    boolean warned = false;
+    
     JGAddress jgSender = new JGAddress(sender);
-    Digest digest = nakack.getDigest(jgSender);
-    if (digest != null) {
-      for (;;) {
-        long[] senderSeqnos = digest.get(jgSender);
-        if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
-          break;
+    
+    for (;;) {
+      Digest digest = nakack.getDigest(jgSender);
+      if (digest == null) {
+        return;
+      }
+      String received = "none";
+      long[] senderSeqnos = digest.get(jgSender);
+      if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
+        break;
+      }
+      long now = System.currentTimeMillis();
+      if (!warned && now >= warnTime) {
+        warned = true;
+        if (senderSeqnos != null) {
+          received = String.valueOf(senderSeqnos[0]);
         }
-        Thread.sleep(50);
+        logger.warn("{} seconds have elapsed while waiting for multicast messages from {}.  Received {} but expecting at least {}.",
+            Long.toString((warnTime-startTime)/1000L), sender, received, seqno);
+      }
+      if (now >= quitTime) {
+        throw new GemFireIOException("Multicast operations from " + sender + " did not distribute within " + (now - startTime) + " milliseconds");
       }
+      Thread.sleep(50);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index ca699b5..5b53290 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -630,7 +630,7 @@ public class GMSJoinLeaveJUnitTest {
     installViewMessage = new InstallViewMessage(partitionView, credentials, false);
     gmsJoinLeave.processMessage(installViewMessage);
     
-    verify(manager).forceDisconnect(any(String.class));
+    verify(manager).forceDisconnect(isA(String.class));
     verify(manager).quorumLost(crashes, newView);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index ae55f4f..805dd88 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -104,6 +104,7 @@ public class JGroupsMessengerJUnitTest {
     nonDefault.put(DistributionConfig.LOG_FILE_NAME, "");
     nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
     nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]");
+    nonDefault.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "1");
     DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
     RemoteTransportConfig tconfig = new RemoteTransportConfig(config,
         DistributionManager.NORMAL_DM_TYPE);
@@ -819,7 +820,7 @@ public class JGroupsMessengerJUnitTest {
   }
   
   @Test
-  public void testWaitForMessageState() throws Exception {
+  public void testWaitForMessageStateSucceeds() throws Exception {
     initMocks(true/*multicast*/);
     NAKACK2 nakack = mock(NAKACK2.class);
     Digest digest = mock(Digest.class);
@@ -834,14 +835,25 @@ public class JGroupsMessengerJUnitTest {
         new long[] {0,0}, new long[] {2, 50}, null);
     messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
     verify(digest, times(3)).get(isA(Address.class));
-    
-    // for code coverage let's invoke the other waitForMessageState method
-    Map state = new HashMap();
-    state.put("JGroups.mcastState", Long.valueOf(10L));
-    messenger.waitForMessageState(createAddress(1234), state);
   }
   
-
+  @Test
+  public void testWaitForMessageStateThrowsExceptionIfMessagesMissing() throws Exception {
+    initMocks(true/*multicast*/);
+    NAKACK2 nakack = mock(NAKACK2.class);
+    Digest digest = mock(Digest.class);
+    when(nakack.getDigest(any(Address.class))).thenReturn(digest);
+    when(digest.get(any(Address.class))).thenReturn(
+        new long[] {0,0}, new long[] {2, 50}, new long[] {49, 50});
+    try {
+      // message 50 will never arrive
+      messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
+      fail("expected a GemFireIOException to be thrown");
+    } catch (GemFireIOException e) {
+      // pass
+    }
+  }
+  
   @Test
   public void testMulticastTest() throws Exception {
     initMocks(true);