You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/01/29 02:16:41 UTC
[24/52] [abbrv] incubator-geode git commit: GEODE-836: multicast
state flush is broken
GEODE-836: multicast state flush is broken
The Digest from the multicast NAKACK2 protocol is immutable and needs
to be fetched each time it is checked in order to get an up-to-date
digest. I also added warning and timeout for the method similar to
what's in DistributionAdvisor.waitForCurrentOperations(long) that
guarantee that the wait will terminate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ed17d4cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ed17d4cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ed17d4cd
Branch: refs/heads/feature/GEODE-831
Commit: ed17d4cd50595b5a611b4a4068a1e2b895a06ff6
Parents: 2609946
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Jan 26 08:53:40 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Jan 26 08:56:47 2016 -0800
----------------------------------------------------------------------
.../gms/messenger/JGroupsMessenger.java | 35 ++++++++++++++++----
.../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +-
.../messenger/JGroupsMessengerJUnitTest.java | 26 +++++++++++----
3 files changed, 48 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ccff687..be2c405 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -533,16 +533,37 @@ public class JGroupsMessenger implements Messenger {
*/
protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long seqno)
throws InterruptedException {
+ long timeout = services.getConfig().getDistributionConfig().getAckWaitThreshold() * 1000L;
+ long startTime = System.currentTimeMillis();
+ long warnTime = startTime + timeout;
+ long quitTime = warnTime + timeout - 1000L;
+ boolean warned = false;
+
JGAddress jgSender = new JGAddress(sender);
- Digest digest = nakack.getDigest(jgSender);
- if (digest != null) {
- for (;;) {
- long[] senderSeqnos = digest.get(jgSender);
- if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
- break;
+
+ for (;;) {
+ Digest digest = nakack.getDigest(jgSender);
+ if (digest == null) {
+ return;
+ }
+ String received = "none";
+ long[] senderSeqnos = digest.get(jgSender);
+ if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
+ break;
+ }
+ long now = System.currentTimeMillis();
+ if (!warned && now >= warnTime) {
+ warned = true;
+ if (senderSeqnos != null) {
+ received = String.valueOf(senderSeqnos[0]);
}
- Thread.sleep(50);
+ logger.warn("{} seconds have elapsed while waiting for multicast messages from {}. Received {} but expecting at least {}.",
+ Long.toString((warnTime-startTime)/1000L), sender, received, seqno);
+ }
+ if (now >= quitTime) {
+ throw new GemFireIOException("Multicast operations from " + sender + " did not distribute within " + (now - startTime) + " milliseconds");
}
+ Thread.sleep(50);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index ca699b5..5b53290 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -630,7 +630,7 @@ public class GMSJoinLeaveJUnitTest {
installViewMessage = new InstallViewMessage(partitionView, credentials, false);
gmsJoinLeave.processMessage(installViewMessage);
- verify(manager).forceDisconnect(any(String.class));
+ verify(manager).forceDisconnect(isA(String.class));
verify(manager).quorumLost(crashes, newView);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed17d4cd/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index ae55f4f..805dd88 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -104,6 +104,7 @@ public class JGroupsMessengerJUnitTest {
nonDefault.put(DistributionConfig.LOG_FILE_NAME, "");
nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]");
+ nonDefault.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "1");
DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
RemoteTransportConfig tconfig = new RemoteTransportConfig(config,
DistributionManager.NORMAL_DM_TYPE);
@@ -819,7 +820,7 @@ public class JGroupsMessengerJUnitTest {
}
@Test
- public void testWaitForMessageState() throws Exception {
+ public void testWaitForMessageStateSucceeds() throws Exception {
initMocks(true/*multicast*/);
NAKACK2 nakack = mock(NAKACK2.class);
Digest digest = mock(Digest.class);
@@ -834,14 +835,25 @@ public class JGroupsMessengerJUnitTest {
new long[] {0,0}, new long[] {2, 50}, null);
messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
verify(digest, times(3)).get(isA(Address.class));
-
- // for code coverage let's invoke the other waitForMessageState method
- Map state = new HashMap();
- state.put("JGroups.mcastState", Long.valueOf(10L));
- messenger.waitForMessageState(createAddress(1234), state);
}
-
+ @Test
+ public void testWaitForMessageStateThrowsExceptionIfMessagesMissing() throws Exception {
+ initMocks(true/*multicast*/);
+ NAKACK2 nakack = mock(NAKACK2.class);
+ Digest digest = mock(Digest.class);
+ when(nakack.getDigest(any(Address.class))).thenReturn(digest);
+ when(digest.get(any(Address.class))).thenReturn(
+ new long[] {0,0}, new long[] {2, 50}, new long[] {49, 50});
+ try {
+ // message 50 will never arrive
+ messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
+ fail("expected a GemFireIOException to be thrown");
+ } catch (GemFireIOException e) {
+ // pass
+ }
+ }
+
@Test
public void testMulticastTest() throws Exception {
initMocks(true);