You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/20 22:02:08 UTC
[14/50] [abbrv] incubator-geode git commit: GEODE-77: avoids creating
a split-brain if joining times out and network partition detection is not
enabled
GEODE-77: avoids creating a split-brain if joining times out and network partition detection is not enabled
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5feab824
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5feab824
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5feab824
Branch: refs/heads/develop
Commit: 5feab8241deb507c11152236b152b91d983eeace
Parents: cfba7ae
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Oct 23 13:05:46 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Oct 23 13:06:11 2015 -0700
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 4 -
.../gms/locator/FindCoordinatorResponse.java | 6 ++
.../membership/gms/membership/GMSJoinLeave.java | 95 ++++++++++++--------
.../membership/gms/messenger/Transport.java | 15 ++++
.../gemfire/internal/tcp/Connection.java | 15 +++-
.../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +-
6 files changed, 92 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index ed9f214..1ca206f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -266,10 +266,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
- if (beingSick || playingDead) {
- logger.debug("sick member is not sending suspect message concerning {}", mbr);
- return;
- }
logger.info("Sending suspect request {} reason=\"{}\"", mbr, reason);
SuspectRequest sr = new SuspectRequest(mbr, reason);
List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 5f9576b..0d2ba68 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -65,6 +65,12 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
return coordinator;
}
+ /**
+ * When the response comes from a locator via TcpClient this
+ * will return the locators member ID. If the locator hasn't
+ * yet joined this may be null.
+ * @return
+ */
public InternalDistributedMember getSenderId() {
return senderId;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 57611e6..5a792eb 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -163,7 +163,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Set<InternalDistributedMember> registrants = new HashSet<>();
InternalDistributedMember possibleCoordinator;
int viewId = -1;
- boolean hasContactedALocator;
+ int locatorsContacted = 0;
+ boolean hasContactedAJoinedLocator;
NetView view;
Set<FindCoordinatorResponse> responses = new HashSet<>();
@@ -201,10 +202,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
SearchState state = searchState;
+ long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000;
long timeout = services.getConfig().getJoinTimeout();
logger.debug("join timeout is set to {}", timeout);
long retrySleep = JOIN_RETRY_SLEEP;
long startTime = System.currentTimeMillis();
+ long locatorGiveUpTime = startTime + locatorWaitTime;
long giveupTime = startTime + timeout;
for (int tries=0; !this.isJoined; tries++) {
@@ -232,7 +235,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} else {
- if (System.currentTimeMillis() > giveupTime) {
+ long now = System.currentTimeMillis();
+ if (state.locatorsContacted <= 0) {
+ if (now > locatorGiveUpTime) {
+ // break out of the loop and return false
+ break;
+ }
+ // reset the tries count and timer since we haven't actually tried to join yet
+ tries = 0;
+ giveupTime = now + timeout;
+ } else if (System.currentTimeMillis() > giveupTime) {
break;
}
}
@@ -251,7 +263,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// to preserve old behavior we need to throw a SystemConnectException if
// unable to contact any of the locators
- if (!this.isJoined && state.hasContactedALocator) {
+ if (!this.isJoined && state.hasContactedAJoinedLocator) {
throw new SystemConnectException("Unable to join the distributed system in "
+ (System.currentTimeMillis()-startTime) + "ms");
}
@@ -750,24 +762,27 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
assert this.localAddress != null;
- // TODO - should we try more than one preferred coordinator
- // before jumping to asking view-members who the coordinator is?
- if ( !state.alreadyTried.isEmpty() && state.view != null) {
+ // If we've already tried to bootstrap from locators that
+ // haven't joined the system (e.g., a collocated locator)
+ // then jump to using the membership view to try to find
+ // the coordinator
+ if ( !state.hasContactedAJoinedLocator && state.view != null) {
return findCoordinatorFromView();
}
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
- long waitTime = services.getConfig().getLocatorWaitTime() * 1000;
- if (waitTime <= 0) {
- waitTime = services.getConfig().getMemberTimeout() * 2;
- }
- long giveUpTime = System.currentTimeMillis() + waitTime;
+
+ long giveUpTime = System.currentTimeMillis() + services.getConfig().getLocatorWaitTime() * 1000;
+
int connectTimeout = (int)services.getConfig().getMemberTimeout();
boolean anyResponses = false;
boolean flagsSet = false;
logger.debug("sending {} to {}", request, locators);
+
+ state.hasContactedAJoinedLocator = false;
+ state.locatorsContacted = 0;
do {
for (InetSocketAddress addr: locators) {
@@ -776,42 +791,46 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
addr.getAddress(), addr.getPort(), request, connectTimeout,
true);
FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
- if (response != null && response.getCoordinator() != null) {
- anyResponses = true;
- NetView v = response.getView();
- int viewId = v == null? -1 : v.getViewId();
- if (viewId > state.viewId) {
- // if the view has changed it is possible that a member
- // that we already tried to join with will become coordinator
- state.alreadyTried.clear();
- state.viewId = viewId;
- state.view = v;
- state.registrants.clear();
- if (response.getRegistrants() != null) {
- state.registrants.addAll(response.getRegistrants());
- }
+ // TODO we don't want to give up on the locators if we receive
+ // a response from a locator that's joined the system. Otherwise
+ // we'll give up and cause a split-brain
+ if (response != null) {
+ state.locatorsContacted++;
+ if (response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
+ state.hasContactedAJoinedLocator = true;
}
- coordinators.add(response.getCoordinator());
- if (!flagsSet) {
- flagsSet = true;
- inheritSettingsFromLocator(addr, response);
+ if (response.getCoordinator() != null) {
+ anyResponses = true;
+ NetView v = response.getView();
+ int viewId = v == null? -1 : v.getViewId();
+ if (viewId > state.viewId) {
+ // if the view has changed it is possible that a member
+ // that we already tried to join with will become coordinator
+ state.alreadyTried.clear();
+ state.viewId = viewId;
+ state.view = v;
+ state.registrants.clear();
+ if (response.getRegistrants() != null) {
+ state.registrants.addAll(response.getRegistrants());
+ }
+ }
+ coordinators.add(response.getCoordinator());
+ if (!flagsSet) {
+ flagsSet = true;
+ inheritSettingsFromLocator(addr, response);
+ }
}
}
} catch (IOException | ClassNotFoundException problem) {
}
}
- if (coordinators.isEmpty()) {
- return false;
- }
- if (!anyResponses) {
- try { Thread.sleep(1000); } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
} while (!anyResponses && System.currentTimeMillis() < giveUpTime);
+ if (coordinators.isEmpty()) {
+ return false;
+ }
+
Iterator<InternalDistributedMember> it = coordinators.iterator();
if (coordinators.size() == 1) {
state.possibleCoordinator = it.next();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
index adb49b9..9f91a74 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
@@ -61,4 +61,19 @@ public class Transport extends UDP {
super.init();
}
+ /*
+ * (non-Javadoc)
+ * @see org.jgroups.protocols.UDP#stop()
+ * JGroups is not terminating its timer. I contacted the jgroups-users
+ * email list about this.
+ */
+ @Override
+ public void stop() {
+ super.stop();
+ if (!getTimer().isShutdown()) {
+ getTimer().stop();
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 2e903f7..9f079fa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -1884,12 +1884,18 @@ public class Connection implements Runnable {
}
catch (ClosedChannelException e) {
this.readerShuttingDown = true;
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ initiateSuspicionIfShared();
+ }
try {
requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
} catch (Exception ex) {}
return;
}
catch (IOException e) {
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ initiateSuspicionIfShared();
+ }
if (! isSocketClosed()
&& !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08
) {
@@ -1902,7 +1908,6 @@ public class Connection implements Runnable {
}
}
}
- initiateSuspicionIfShared();
this.readerShuttingDown = true;
try {
requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
@@ -1914,6 +1919,9 @@ public class Connection implements Runnable {
if (!stopped && ! isSocketClosed() ) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e);
}
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ initiateSuspicionIfShared();
+ }
this.readerShuttingDown = true;
try {
requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e));
@@ -2434,6 +2442,9 @@ public class Connection implements Runnable {
this.stopped = true;
}
catch (IOException io) {
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ initiateSuspicionIfShared();
+ }
boolean closed = isSocketClosed()
|| "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08
if (!closed) {
@@ -2441,7 +2452,6 @@ public class Connection implements Runnable {
logger.debug("{} io exception for {}", p2pReaderName(), this, io);
}
}
- initiateSuspicionIfShared();
this.readerShuttingDown = true;
try {
requestClose(LocalizedStrings.Connection_IOEXCEPTION_RECEIVED_0.toLocalizedString(io));
@@ -2468,6 +2478,7 @@ public class Connection implements Runnable {
if (!stopped && !(e instanceof InterruptedException) ) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED, p2pReaderName()), e);
}
+ initiateSuspicionIfShared();
if (isSocketClosed()) {
stopped = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 42594cf..41b0df7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -136,7 +136,7 @@ public class GMSJoinLeaveJUnitTest {
mockMembers[1].setVmViewId(viewId-1);
set.add(mockMembers[1]);
state.alreadyTried = set;
- state.hasContactedALocator = true;
+ state.hasContactedAJoinedLocator = true;
// simulate a response being received
InternalDistributedMember sender = mockMembers[2];