You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:48:35 UTC
[05/50] [abbrv] incubator-geode git commit: GEODE-77: members
unfairly forced out of the distributed system
GEODE-77: members unfairly forced out of the distributed system
n integration testing members were, under heavy load, being forced out
of the distributed system in spite of the new tcp/ip final-checks.
This change set adds a recent-activity check to the doTCPCheckMember method
of GMSHealthMonitor. It also corrects this method to use SocketCreator with
a background timeout task in case the tcp/ip socket timeout fails to work
(as it sometimes does).
During testing I also saw that GMSJoinLeave was not initiating a forced-
disconnect if it received a membership view showing that it was no longer
in the distributed system and modified it to do so.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/28d273cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/28d273cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/28d273cb
Branch: refs/heads/feature/GEODE-53
Commit: 28d273cb0f19ff2d6b2b506b1d82b91b5ce2ce36
Parents: 026d149
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Nov 9 08:44:17 2015 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Nov 9 08:44:17 2015 -0800
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 107 ++++++++++++++-----
.../membership/gms/membership/GMSJoinLeave.java | 6 +-
.../gms/membership/GMSJoinLeaveJUnitTest.java | 19 +++-
3 files changed, 99 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 0307f5b..9e410e2 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -54,6 +56,8 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.Heartbe
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.ConnectionWatcher;
+import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.Version;
/**
@@ -262,6 +266,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
try {
DataInputStream in = new DataInputStream(socket.getInputStream());
OutputStream out = socket.getOutputStream();
+ @SuppressWarnings("unused")
short version = in.readShort();
int vmViewId = in.readInt();
long uuidLSBs = in.readLong();
@@ -309,7 +314,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
} catch (IOException e) {
- logger.trace("Unexpected exception", e);
+ logger.debug("Unexpected exception", e);
+ } catch (RuntimeException e) {
+ logger.debug("Unexpected runtime exception", e);
+ throw e;
+ } catch (Error e) {
+ logger.debug("Unexpected error", e);
+ throw e;
} finally {
if (socket != null) {
try {
@@ -369,7 +380,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (!pinged) {
suspectedMemberInView.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
- GMSHealthMonitor.this.sendSuspectMessage(mbr, reason);
+ GMSHealthMonitor.this.initiateSuspicion(mbr, reason);
} else {
logger.trace("Setting next neighbor as member {} has responded.", mbr);
suspectedMemberInView.remove(mbr);
@@ -381,7 +392,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
- private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
+ private void initiateSuspicion(InternalDistributedMember mbr, String reason) {
SuspectRequest sr = new SuspectRequest(mbr, reason);
List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
sl.add(sr);
@@ -418,16 +429,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
TimeStamp ts = memberTimeStamps.get(member);
if (pingResp.getResponseMsg() == null) {
// double check the activity map
- long now = System.currentTimeMillis();
if (isStopping) {
return true;
}
- if (ts != null && (now - ts.getTime()) <= memberTimeout) {
- logger.trace("detected message traffic from member {}ms ago. member-timeout is {}", now - ts.getTime(),
- memberTimeout);
+ if (checkRecentActivity(member)) {
return true;
}
- logger.trace("no heartbeat response received from {}", member);
+ logger.trace("no heartbeat response received from {} and no recent activity", member);
return false;
} else {
logger.trace("received heartbeat from {}", member);
@@ -447,6 +455,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
/**
+ * Check for recent messaging activity from the given member
+ * @param suspectMember
+ * @return whether there has been activity within memberTimeout ms
+ */
+ private boolean checkRecentActivity(InternalDistributedMember suspectMember) {
+ TimeStamp ts = memberTimeStamps.get(suspectMember);
+ return (ts != null && (System.currentTimeMillis() - ts.getTime()) <= memberTimeout);
+ }
+
+ /**
* During final check, establish TCP connection between current member and suspect member.
* And exchange PING/PONG message to see if the suspect member is still alive.
*
@@ -454,15 +472,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
- Socket clientSocket = new Socket();
+ Socket clientSocket = null;
+ // first check for a recent timestamp
+ if (checkRecentActivity(suspectMember)) {
+ return true;
+ }
try {
// establish TCP connection
// for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
// logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
// }
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
- InetSocketAddress addr = new InetSocketAddress(suspectMember.getInetAddress(), port);
- clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
+ clientSocket = SocketCreator.getDefaultInstance().connect(suspectMember.getInetAddress(), port,
+ (int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
if (clientSocket.isConnected()) {
clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
InputStream in = clientSocket.getInputStream();
@@ -485,15 +507,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return true;
} else {
//received ERROR
- return false;
+ return checkRecentActivity(suspectMember);
}
} else {// cannot establish TCP connection with suspect member
- return false;
+ return checkRecentActivity(suspectMember);
}
} catch (SocketTimeoutException e) {
+ logger.debug("tcp/ip connection timed out");
return false;
} catch (IOException e) {
- logger.trace("Unexpected exception", e);
+ logger.debug("Unexpected exception", e);
} finally {
try {
if (clientSocket != null) {
@@ -515,7 +538,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
*/
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
- sendSuspectMessage(mbr, reason);
+ initiateSuspicion(mbr, reason);
// Background suspect-collecting thread is currently disabled - it takes too long
// synchronized (suspectRequests) {
// SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
@@ -898,13 +921,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void beSick() {
this.beingSick = true;
- sendSuspectMessage(localAddress, "beSick invoked on GMSHealthMonitor");
+ initiateSuspicion(localAddress, "beSick invoked on GMSHealthMonitor");
}
@Override
public void playDead() {
this.playingDead = true;
- sendSuspectMessage(localAddress, "playDead invoked on GMSHealthMonitor");
+ initiateSuspicion(localAddress, "playDead invoked on GMSHealthMonitor");
}
@Override
@@ -1163,11 +1186,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
if (!failed) {
logger.info("Final check passed");
+ contactedBy(mbr);
}
// whether it's alive or not, at this point we allow it to
// be watched again
suspectedMemberInView.remove(mbr);
- contactedBy(mbr);
} catch (DistributedSystemDisconnectedException e) {
return;
} catch (Exception e) {
@@ -1183,6 +1206,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
membersInFinalCheck.removeAll(membersChecked);
}
}
+ @Override
+ public void memberShutdown(DistributedMember mbr, String reason) {
+ }
+
+ @Override
+ public int getFailureDetectionPort() {
+ return this.socketPort;
+ }
interface Callback<T> {
public void process(List<T> requests);
@@ -1318,13 +1349,37 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
- @Override
- public void memberShutdown(DistributedMember mbr, String reason) {
- }
-
- @Override
- public int getFailureDetectionPort() {
- return this.socketPort;
+ private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {
+ Timer scheduler;
+ Socket socket;
+ long timeout;
+
+ ConnectTimeoutTask(Timer scheduler, long timeout) {
+ this.scheduler = scheduler;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void beforeConnect(Socket socket) {
+ this.socket = socket;
+ scheduler.schedule(this, timeout);
+ }
+
+ @Override
+ public void afterConnect(Socket socket) {
+ cancel();
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException e) {
+ // ignored - nothing useful to do here
+ }
+ }
+
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 4b1993f..a6121dd 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -782,10 +782,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} else { // !preparing
- if (currentView != null && !view.contains(this.localAddress)) {
- if (quorumRequired) {
- forceDisconnect("This node is no longer in the membership view");
- }
+ if (isJoined && currentView != null && !view.contains(this.localAddress)) {
+ forceDisconnect("This node is no longer in the membership view");
} else {
if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
ackView(m);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 89282c0..562fd70 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -1,5 +1,6 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -7,12 +8,12 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -170,7 +171,19 @@ public class GMSJoinLeaveJUnitTest {
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
-
+ @Test
+ public void testViewWithoutMemberInitiatesForcedDisconnect() throws Exception {
+ initMocks();
+ gmsJoinLeave.becomeCoordinatorForTest();
+ List<InternalDistributedMember> members = Arrays.asList(mockMembers);
+ Set<InternalDistributedMember> empty = Collections.<InternalDistributedMember>emptySet();
+ NetView v = new NetView(mockMembers[0], 2, members, empty, empty);
+ InstallViewMessage message = new InstallViewMessage(v, null);
+ gmsJoinLeave.processMessage(message);
+ verify(manager).forceDisconnect(any(String.class));
+ }
+
+
@Test
public void testProcessJoinMessageWithBadAuthentication() throws IOException {
initMocks();