You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/05 20:50:25 UTC
[10/11] incubator-geode git commit: GEODE-1327 Now ViewReply
processor returns copy of members.
GEODE-1327 Now ViewReply processor returns copy of members.
Now it also throws interrupt exception which is handled at
caller level.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5817cdbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5817cdbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5817cdbf
Branch: refs/heads/feature/GEODE-1276
Commit: 5817cdbfc94ba779617f3b086f8d099cb1f1bc21
Parents: f77739b
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Wed May 4 09:29:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Thu May 5 10:07:58 2016 -0700
----------------------------------------------------------------------
.../membership/gms/membership/GMSJoinLeave.java | 94 ++++++++++----------
1 file changed, 46 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5817cdbf/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 9f5648b..88e4d49 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -35,6 +35,7 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoor
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.*;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.AuthenticationFailedException;
@@ -477,7 +478,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
} catch (Exception e) {
rejection = e.getMessage();
- e.printStackTrace();
}
if (rejection != null && rejection.length() > 0) {
JoinResponseMessage m = new JoinResponseMessage(rejection);
@@ -763,15 +763,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) {
+ boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
return sendView(view, newMembers, true, this.prepareProcessor);
}
- void sendView(NetView view, List<InternalDistributedMember> newMembers) {
+ void sendView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
sendView(view, newMembers, false, this.viewProcessor);
}
- private boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor viewReplyProcessor) {
+ private boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing,
+ ViewReplyProcessor viewReplyProcessor) throws InterruptedException {
int id = view.getViewId();
InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
@@ -1695,22 +1696,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- Set<InternalDistributedMember> waitForResponses() {
- Set<InternalDistributedMember> result = this.notRepliedYet;
+ Set<InternalDistributedMember> waitForResponses() throws InterruptedException {
+ Set<InternalDistributedMember> result = null;
long endOfWait = System.currentTimeMillis() + viewAckTimeout;
try {
while (System.currentTimeMillis() < endOfWait && (services.getCancelCriterion().cancelInProgress() == null)) {
try {
synchronized (this) {
- if (!waiting || result.isEmpty() || this.conflictingView != null) {
+ if (!waiting || this.notRepliedYet.isEmpty() || this.conflictingView != null) {
break;
}
wait(1000);
}
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for view responses");
- Thread.currentThread().interrupt();
- return result;
+ throw e;
}
}
} finally {
@@ -1719,8 +1719,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// if we've set waiting to false due to incoming messages then
// we've discounted receiving any other responses from the
// remaining members due to leave/crash notification
- result = pendingRemovals;
+ result = new HashSet<>(pendingRemovals);
} else {
+ result = new HashSet<>(this.notRepliedYet);
result.addAll(pendingRemovals);
this.waiting = false;
}
@@ -1737,8 +1738,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return this.conflictingViewSender;
}
- Set<InternalDistributedMember> getUnresponsiveMembers() {
- return this.notRepliedYet;
+ synchronized Set<InternalDistributedMember> getUnresponsiveMembers() {
+ return new HashSet<>(this.notRepliedYet);
}
}
@@ -1834,29 +1835,33 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
private void sendInitialView() {
- if (initialView == null) {
- return;
- }
- NetView v = preparedView;
- if (v != null) {
- processPreparedView(v);
- }
try {
- NetView iView;
- List<InternalDistributedMember> iJoins;
- Set<InternalDistributedMember> iLeaves;
- Set<InternalDistributedMember> iRemoves;
- synchronized (this) {
- iView = initialView;
- iJoins = initialJoins;
- iLeaves = initialLeaving;
- iRemoves = initialRemovals;
+ if (initialView == null) {
+ return;
}
- if (iView != null) {
- prepareAndSendView(iView, iJoins, iLeaves, iRemoves);
+ NetView v = preparedView;
+ if (v != null) {
+ processPreparedView(v);
}
- } finally {
- setInitialView(null, null, null, null);
+ try {
+ NetView iView;
+ List<InternalDistributedMember> iJoins;
+ Set<InternalDistributedMember> iLeaves;
+ Set<InternalDistributedMember> iRemoves;
+ synchronized (this) {
+ iView = initialView;
+ iJoins = initialJoins;
+ iLeaves = initialLeaving;
+ iRemoves = initialRemovals;
+ }
+ if (iView != null) {
+ prepareAndSendView(iView, iJoins, iLeaves, iRemoves);
+ }
+ } finally {
+ setInitialView(null, null, null, null);
+ }
+ } catch (InterruptedException e) {
+ shutdown = true;
}
}
@@ -1958,6 +1963,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
} catch (DistributedSystemDisconnectedException e) {
shutdown = true;
+ } catch (InterruptedException e) {
+ shutdown = true;
}
requests = null;
}
@@ -2003,8 +2010,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* Create a new membership view and send it to members (including crashed members).
* Returns false if the view cannot be prepared successfully, true otherwise
+ * @throws InterruptedException
*/
- void createAndSendView(List<DistributionMessage> requests) {
+ void createAndSendView(List<DistributionMessage> requests) throws InterruptedException {
List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
@@ -2141,9 +2149,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* This handles the 2-phase installation of the view
+ * @throws InterruptedException
*/
void prepareAndSendView(NetView newView, List<InternalDistributedMember> joinReqs, Set<InternalDistributedMember> leaveReqs,
- Set<InternalDistributedMember> removalReqs) {
+ Set<InternalDistributedMember> removalReqs) throws InterruptedException {
boolean prepared = false;
do {
if (this.shutdown || Thread.currentThread().isInterrupted()) {
@@ -2152,13 +2161,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (quorumRequired && isNetworkPartition(newView, true)) {
sendNetworkPartitionMessage(newView);
- try {
- Thread.sleep(BROADCAST_MESSAGE_SLEEP_TIME);
- } catch (InterruptedException e) {
- // signal the run() method to exit
- shutdown = true;
- return;
- }
+ Thread.sleep(BROADCAST_MESSAGE_SLEEP_TIME);
+
Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
shutdown = true;
@@ -2178,13 +2182,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
unresponsive.removeAll(removalReqs);
unresponsive.removeAll(leaveReqs);
if (!unresponsive.isEmpty()) {
- try {
- removeHealthyMembers(unresponsive);
- } catch (InterruptedException e) {
- // abort the view if interrupted
- shutdown = true;
- return;
- }
+ removeHealthyMembers(unresponsive);
}
logger.debug("unresponsive members that could not be reached: {}", unresponsive);