You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/01 01:09:33 UTC
[24/34] incubator-geode git commit: GEODE-870: Handling multiple
concurrent locator restarts. Elder locator nomination
GEODE-870: Handling multiple concurrent locator restarts. Elder locator nomination
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/edc1c4c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/edc1c4c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/edc1c4c6
Branch: refs/heads/feature/GEODE-949-2
Commit: edc1c4c6f573056324a9a345f05d25ee3a2d19ff
Parents: 34beda8
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Feb 10 09:20:09 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Feb 29 16:36:08 2016 +1100
----------------------------------------------------------------------
.../gms/messages/ViewRejectMessage.java | 96 ------
.../gms/membership/GMSJoinLeaveHelper.java | 60 ++++
.../internal/membership/NetView.java | 5 +
.../membership/gms/membership/GMSJoinLeave.java | 319 ++++++++++---------
.../gms/messages/InstallViewMessage.java | 18 +-
.../gemstone/gemfire/internal/DSFIDFactory.java | 11 +-
.../internal/DataSerializableFixedID.java | 2 -
.../gemfire/distributed/LocatorDUnitTest.java | 209 ++++++------
gradle/rat.gradle | 2 +-
9 files changed, 352 insertions(+), 370 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
deleted file mode 100755
index e5bf9e2..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class ViewRejectMessage extends HighPriorityDistributionMessage {
-
- private int viewId;
- private NetView rejectedView;
- private String reason;
-
- public ViewRejectMessage(InternalDistributedMember recipient, int viewId, NetView rejectedView, String reason) {
- super();
- setRecipient(recipient);
- this.viewId = viewId;
- this.rejectedView = rejectedView;
- this.reason = reason;
- }
-
- public ViewRejectMessage() {
- // no-arg constructor for serialization
- }
-
- public int getViewId() {
- return viewId;
- }
-
- public NetView getRejectedView() {
- return this.rejectedView;
- }
-
-
- @Override
- public int getDSFID() {
- // TODO Auto-generated method stub
- return VIEW_REJECT_MESSAGE;
- }
-
- public String getReason() {
- return reason;
- }
-
- @Override
- public int getProcessorType() {
- return 0;
- }
-
- @Override
- public void process(DistributionManager dm) {
- throw new IllegalStateException("this message is not intended to execute in a thread pool");
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- out.writeInt(this.viewId);
- DataSerializer.writeObject(this.rejectedView, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.viewId = in.readInt();
- this.rejectedView = DataSerializer.readObject(in);
- }
-
- @Override
- public String toString() {
- String s = getSender() == null? getRecipientsDescription() : ""+getSender();
- return "ViewRejectMessage("+s+"; "+this.viewId+"; rejectedView="+this.rejectedView +")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java
new file mode 100644
index 0000000..b8311bc
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
+
+public class GMSJoinLeaveHelper {
+ public static boolean isViewCreator() {
+ GMSJoinLeave gmsJoinLeave = getGmsJoinLeave();
+ if (gmsJoinLeave != null) {
+ GMSJoinLeave.ViewCreator viewCreator = gmsJoinLeave.getViewCreator();
+ if (viewCreator != null && !viewCreator.isShutdown()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ throw new RuntimeException("This should not have happened. There should be a JoinLeave for every DS");
+ }
+
+ private static GMSJoinLeave getGmsJoinLeave() {
+ InternalDistributedSystem distributedSystem = getInternalDistributedSystem();
+ DM dm = distributedSystem.getDM();
+ GMSMembershipManager membershipManager = (GMSMembershipManager) dm.getMembershipManager();
+ Services services = membershipManager.getServices();
+ return (GMSJoinLeave) services.getJoinLeave();
+ }
+
+ public static Integer getViewId() {
+ return getGmsJoinLeave().getView().getViewId();
+ }
+
+ private static InternalDistributedSystem getInternalDistributedSystem() {
+ InternalDistributedSystem distributedSystem = InternalDistributedSystem.getAnyInstance();
+ if (distributedSystem == null) {
+ Locator locator = Locator.getLocator();
+ return (InternalDistributedSystem) locator.getDistributedSystem();
+ } else {
+ return distributedSystem;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index 40f5f71..af05f82 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
+import com.gemstone.gemfire.internal.logging.LogService;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
@@ -47,6 +48,9 @@ import com.gemstone.gemfire.internal.Version;
*/
public class NetView implements DataSerializableFixedID {
+ private static final Logger logger = LogService.getLogger();
+
+
private int viewId;
private List<InternalDistributedMember> members;
private int[] failureDetectionPorts = new int[10];
@@ -86,6 +90,7 @@ public class NetView implements DataSerializableFixedID {
crashedMembers = Collections.emptySet();
this.creator = creator;
Arrays.fill(failureDetectionPorts, -1);
+
}
// legacy method for JGMM
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index c7eacfa..b246344 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -259,14 +259,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// unable to contact any of the locators
if (!this.isJoined && state.hasContactedAJoinedLocator) {
throw new SystemConnectException("Unable to join the distributed system in "
- + (System.currentTimeMillis()-startTime) + "ms");
+ + (System.currentTimeMillis() - startTime) + "ms");
}
return this.isJoined;
} finally {
// notify anyone waiting on the address to be completed
if (this.isJoined) {
- synchronized(this.localAddress) {
+ synchronized (this.localAddress) {
this.localAddress.notifyAll();
}
}
@@ -277,10 +277,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* send a join request and wait for a reply. Process the reply.
* This may throw a SystemConnectException or an AuthenticationFailedException
- *
+ *
* @return true if the attempt succeeded, false if it timed out
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP")
boolean attemptToJoin() {
SearchState state = searchState;
@@ -302,7 +302,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Thread.currentThread().interrupt();
return false;
}
-
+
if (response == null) {
if (!isJoined) {
logger.debug("received no join response");
@@ -314,13 +314,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse[0] = null;
String failReason = response.getRejectionMessage();
if (failReason != null) {
- if (failReason.contains("Rejecting the attempt of a member using an older version")
+ if (failReason.contains("Rejecting the attempt of a member using an older version")
|| failReason.contains("15806")) {
throw new SystemConnectException(failReason);
}
throw new AuthenticationFailedException(failReason);
}
-
+
if (response.getCurrentView() == null) {
logger.info("received join response with no membership view: {}", response);
return isJoined;
@@ -328,19 +328,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.getBecomeCoordinator()) {
logger.info("I am being told to become the membership coordinator by {}", coord);
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
this.currentView = response.getCurrentView();
becomeCoordinator(null);
}
return true;
}
-
+
this.birthViewId = response.getMemberID().getVmViewId();
this.localAddress.setVmViewId(this.birthViewId);
GMSMember me = (GMSMember) this.localAddress.getNetMember();
me.setBirthViewId(birthViewId);
installView(response.getCurrentView());
-
return true;
}
@@ -384,7 +383,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* this method will enqueue the request for processing in another thread.
* If this is not the coordinator but the coordinator is known, the message
* is forwarded to the coordinator.
- *
+ *
* @param incomingRequest
*/
private void processJoinRequest(JoinRequestMessage incomingRequest) {
@@ -413,12 +412,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
- if (!this.localAddress.getNetMember().preferredForCoordinator() &&
- incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
- JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
- services.getMessenger().send(m);
- return;
- }
+// Remove JoinResponseMessage to fix GEODE-870
+// if (!this.localAddress.getNetMember().preferredForCoordinator() &&
+// incomingRequest.getMemberID().getNetMember().preferredForCoordinator()){
+// JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
+// services.getMessenger().send(joinResponseMessage);
+// return;
+// }
recordViewRequest(incomingRequest);
}
@@ -426,7 +426,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* Process a Leave request from another member. This may cause this member
* to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
- *
+ *
* @param incomingRequest
*/
private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
@@ -442,7 +442,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = incomingRequest.getMemberID();
if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+ logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress());
}
@@ -487,7 +487,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* Process a Remove request from another member. This may cause this member
* to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
- *
+ *
* @param incomingRequest
*/
private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
@@ -501,7 +501,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
return;
}
-
+
if (v == null) {
// not yet a member
return;
@@ -532,11 +532,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
check.addCrashedMembers(removedMembers);
check.removeAll(removedMembers);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
check.removeAll(leftMembers);
}
if (check.getCoordinator().equals(localAddress)) {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator(mbr);
}
}
@@ -580,17 +580,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
void becomeCoordinator() { // package access for unit testing
becomeCoordinator(null);
}
-
+
public void becomeCoordinatorForTest() {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator();
}
}
-
+
/**
* Test hook for delaying the creation of new views.
* This should be invoked before this member becomes coordinator
* and creates its ViewCreator thread.
+ *
* @param millis
*/
public void delayViewCreationForTest(int millis) {
@@ -602,17 +603,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* be invoked under a synch on viewInstallationLock that was held
* at the time the decision was made to become coordinator so that
* the decision is atomic with actually becoming coordinator.
+ *
* @param oldCoordinator may be null
*/
private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
- boolean testing = unitTesting.contains("noRandomViewChange");
assert Thread.holdsLock(viewInstallationLock);
-
+
if (isCoordinator) {
return;
}
-
+
logger.info("This member is becoming the membership coordinator with address {}", localAddress);
isCoordinator = true;
if (currentView == null) {
@@ -622,55 +623,59 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.localAddress.setVmViewId(0);
installView(newView);
isJoined = true;
- if (viewCreator == null || viewCreator.isShutdown()) {
- createViewCreator();
- viewCreator.setDaemon(true);
- viewCreator.start();
- startViewBroadcaster();
- }
+ createAndStartViewCreator(newView);
+ startViewBroadcaster();
} else {
// create and send out a new view
- NetView newView;
- Set<InternalDistributedMember> leaving = new HashSet<>();
- Set<InternalDistributedMember> removals;
- synchronized(viewInstallationLock) {
- int rand = testing? 0 : NetView.RANDOM.nextInt(10);
- int viewNumber = currentView.getViewId() + 5 + rand;
- if (this.localAddress.getVmViewId() < 0) {
- this.localAddress.setVmViewId(viewNumber);
- }
- List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers());
- if (!mbrs.contains(localAddress)) {
- mbrs.add(localAddress);
- }
- synchronized(this.removedMembers) {
- removals = new HashSet<>(this.removedMembers);
- }
- synchronized(this.leftMembers) {
- leaving.addAll(leftMembers);
- }
- if (oldCoordinator != null && !removals.contains(oldCoordinator)) {
- leaving.add(oldCoordinator);
- }
- mbrs.removeAll(removals);
- mbrs.removeAll(leaving);
- newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
- removals);
- newView.setFailureDetectionPorts(currentView);
- newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort());
- }
- if (viewCreator == null || viewCreator.isShutdown()) {
- createViewCreator();
- viewCreator.setInitialView(newView, newView.getNewMembers(), leaving, removals);
- viewCreator.setDaemon(true);
- viewCreator.start();
- startViewBroadcaster();
+ NetView newView = addMemberToNetView(this.currentView, oldCoordinator);
+ createAndStartViewCreator(newView);
+ startViewBroadcaster();
+ }
+ }
+
+ private void createAndStartViewCreator(NetView newView) {
+ if (viewCreator == null || viewCreator.isShutdown()) {
+ viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
+ if (newView != null) {
+ viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers());
}
+ viewCreator.setDaemon(true);
+ viewCreator.start();
}
}
- protected void createViewCreator() {
- viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
+ private NetView addMemberToNetView(NetView netView, InternalDistributedMember oldCoordinator) {
+ boolean testing = unitTesting.contains("noRandomViewChange");
+ NetView newView = null;
+ Set<InternalDistributedMember> leaving = new HashSet<>();
+ Set<InternalDistributedMember> removals;
+ synchronized (viewInstallationLock) {
+ int rand = testing ? 0 : NetView.RANDOM.nextInt(10);
+ int viewNumber = currentView.getViewId() + 5 + rand;
+ if (this.localAddress.getVmViewId() < 0) {
+ this.localAddress.setVmViewId(viewNumber);
+ }
+ List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers());
+ if (!mbrs.contains(localAddress)) {
+ mbrs.add(localAddress);
+ }
+ synchronized (this.removedMembers) {
+ removals = new HashSet<>(this.removedMembers);
+ }
+ synchronized (this.leftMembers) {
+ leaving.addAll(leftMembers);
+ }
+ if (oldCoordinator != null && !removals.contains(oldCoordinator)) {
+ leaving.add(oldCoordinator);
+ }
+ mbrs.removeAll(removals);
+ mbrs.removeAll(leaving);
+ newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
+ removals);
+ newView.setFailureDetectionPorts(currentView);
+ newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort());
+ }
+ return newView;
}
private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView) {
@@ -689,7 +694,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
sendView(view, newMembers, false, this.viewProcessor);
}
- boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
+ private boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor viewReplyProcessor) {
int id = view.getViewId();
InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
@@ -708,6 +713,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (preparing) {
this.preparedView = view;
} else {
+ if(!localAddress.equals(view.getCoordinator()) && getViewCreator() != null)
+ {
+ stopCoordinatorServices();
+ }
installView(view);
}
@@ -716,25 +725,25 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return true;
}
- StringBuilder s = new StringBuilder();
+ StringBuilder stringBuilder = new StringBuilder();
int[] ports = view.getFailureDetectionPorts();
int numMembers = view.size();
- for (int i=0; i<numMembers; i++) {
+ for (int i = 0; i < numMembers; i++) {
if (i > 0) {
- s.append(' ');
+ stringBuilder.append(' ');
}
- s.append(ports[i]);
+ stringBuilder.append(ports[i]);
}
logger.info((preparing ? "preparing" : "sending") + " new view " + view
- + "\nfailure detection ports: " + s.toString());
+ + "\nfailure detection ports: " + stringBuilder.toString());
msg.setRecipients(recips);
Set<InternalDistributedMember> pendingLeaves = getPendingRequestIDs(LEAVE_REQUEST_MESSAGE);
Set<InternalDistributedMember> pendingRemovals = getPendingRequestIDs(REMOVE_MEMBER_REQUEST);
pendingRemovals.removeAll(view.getCrashedMembers());
- rp.initialize(id, responders);
- rp.processPendingRequests(pendingLeaves, pendingRemovals);
+ viewReplyProcessor.initialize(id, responders);
+ viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
services.getMessenger().send(msg);
// only wait for responses during preparation
@@ -745,10 +754,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("finished waiting for responses to view preparation");
- InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
- NetView conflictingView = rp.getConflictingView();
+ InternalDistributedMember conflictingViewSender = viewReplyProcessor.getConflictingViewSender();
+ NetView conflictingView = viewReplyProcessor.getConflictingView();
if (conflictingView != null) {
- logger.warn("received a conflicting membership view from " + conflictingViewSender
+ logger.warn("received a conflicting membership view from " + conflictingViewSender
+ " during preparation: " + conflictingView);
return false;
}
@@ -762,7 +771,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return true;
}
- private void processViewMessage(InstallViewMessage m) {
+ private void processViewMessage(final InstallViewMessage installViewMessage) {
NetView view = m.getView();
@@ -829,9 +838,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
private TcpClientWrapper tcpClientWrapper = new TcpClientWrapper();
-
+
/***
* testing purpose
+ *
* @param tcpClientWrapper
*/
void setTcpClientWrapper(TcpClientWrapper tcpClientWrapper) {
@@ -847,21 +857,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
SearchState state = searchState;
assert this.localAddress != null;
-
+
// If we've already tried to bootstrap from locators that
// haven't joined the system (e.g., a collocated locator)
// then jump to using the membership view to try to find
// the coordinator
- if ( !state.hasContactedAJoinedLocator && state.view != null) {
+ if (!state.hasContactedAJoinedLocator && state.view != null) {
return findCoordinatorFromView();
}
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
-
- long giveUpTime = System.currentTimeMillis() + ((long)services.getConfig().getLocatorWaitTime() * 1000L);
-
- int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
+
+ long giveUpTime = System.currentTimeMillis() + ((long) services.getConfig().getLocatorWaitTime() * 1000L);
+
+ int connectTimeout = (int) services.getConfig().getMemberTimeout() * 2;
boolean anyResponses = false;
boolean flagsSet = false;
@@ -869,12 +879,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
state.hasContactedAJoinedLocator = false;
state.locatorsContacted = 0;
-
+
do {
for (InetSocketAddress addr : locators) {
try {
Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout);
- FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
+ FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null;
if (response != null) {
state.locatorsContacted++;
if (!state.hasContactedAJoinedLocator &&
@@ -886,7 +896,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.getCoordinator() != null) {
anyResponses = true;
NetView v = response.getView();
- int viewId = v == null? -1 : v.getViewId();
+ int viewId = v == null ? -1 : v.getViewId();
if (viewId > state.viewId) {
state.viewId = viewId;
state.view = v;
@@ -946,17 +956,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return true;
}
-
+
protected class TcpClientWrapper {
- protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout)
- throws ClassNotFoundException, IOException{
+ protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout)
+ throws ClassNotFoundException, IOException {
return TcpClient.requestToServer(
- addr.getAddress(), addr.getPort(), request, connectTimeout,
+ addr.getAddress(), addr.getPort(), request, connectTimeout,
true);
}
- }
+ }
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP")
boolean findCoordinatorFromView() {
ArrayList<FindCoordinatorResponse> result;
SearchState state = searchState;
@@ -1028,8 +1038,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) {
boolean enabled = response.isNetworkPartitionDetectionEnabled();
if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) {
- throw new GemFireConfigException("locator at "+addr
- +" does not have network-partition-detection enabled but my configuration has it enabled");
+ throw new GemFireConfigException("locator at " + addr
+ + " does not have network-partition-detection enabled but my configuration has it enabled");
}
GMSMember mbr = (GMSMember) this.localAddress.getNetMember();
@@ -1040,8 +1050,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.isUsePreferredCoordinators()) {
this.quorumRequired = true;
logger.debug("The locator indicates that all locators should be preferred as coordinators");
- if (services.getLocator() != null
- || Locator.hasLocator()
+ if (services.getLocator() != null
+ || Locator.hasLocator()
|| !services.getConfig().getDistributionConfig().getStartLocator().isEmpty()
|| localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true);
@@ -1053,7 +1063,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* receives a JoinResponse holding a membership view or rejection message
- *
+ *
* @param rsp
*/
private void processJoinResponse(JoinResponseMessage rsp) {
@@ -1062,7 +1072,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse.notifyAll();
}
}
-
+
/**
* for testing, do not use in any other case as it is not thread safe
*/
@@ -1142,7 +1152,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
}
-
+
if (isJoined && isNetworkPartition(newView, true)) {
if (quorumRequired) {
Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
@@ -1150,7 +1160,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
}
-
+
previousView = currentView;
currentView = newView;
preparedView = null;
@@ -1160,7 +1170,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!isJoined) {
logger.debug("notifying join thread");
isJoined = true;
- synchronized(joinResponse) {
+ synchronized (joinResponse) {
joinResponse.notifyAll();
}
}
@@ -1179,7 +1189,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// newer than the view just processed - the senders will have to
// resend these
synchronized (viewRequests) {
- for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext();) {
+ for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
DistributionMessage m = it.next();
if (m instanceof JoinRequestMessage) {
it.remove();
@@ -1199,7 +1209,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
synchronized (removedMembers) {
removeMembersFromCollectionIfNotInView(removedMembers, currentView);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
removeMembersFromCollectionIfNotInView(leftMembers, currentView);
}
}
@@ -1208,7 +1218,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Iterator<InternalDistributedMember> iterator = members.iterator();
while (iterator.hasNext()) {
if (!currentView.contains(iterator.next())) {
- iterator.remove();
+ iterator.remove();
}
}
}
@@ -1216,7 +1226,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* Sends a message declaring a network partition to the
* members of the given view via Messenger
- *
+ *
* @param view
*/
void sendNetworkPartitionMessage(NetView view) {
@@ -1279,10 +1289,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void stopCoordinatorServices() {
if (viewCreator != null && !viewCreator.isShutdown()) {
+ logger.debug("Shutting down ViewCreator");
viewCreator.shutdown();
}
}
-
+
private void startViewBroadcaster() {
services.getTimer().schedule(new ViewBroadcaster(), VIEW_BROADCAST_INTERVAL, VIEW_BROADCAST_INTERVAL);
}
@@ -1342,7 +1353,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
stopCoordinatorServices();
if (view != null) {
if (view.size() > 1) {
- List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 5);
+ List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5);
logger.debug("JoinLeave sending a leave request to {}", coords);
LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down");
services.getMessenger().send(m);
@@ -1365,7 +1376,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
processRemoveRequest(msg);
if (!this.isCoordinator) {
msg.resetRecipients();
- msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 10));
+ msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 10));
services.getMessenger().send(msg);
}
}
@@ -1373,8 +1384,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public void memberShutdown(DistributedMember mbr, String reason) {
- LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason);
- msg.setSender((InternalDistributedMember)mbr);
+ LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember) mbr, reason);
+ msg.setSender((InternalDistributedMember) mbr);
processLeaveRequest(msg);
}
@@ -1494,11 +1505,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
protected ViewReplyProcessor getPrepareViewReplyProcessor() {
return prepareProcessor;
}
-
- protected boolean testPrepareProcessorWaiting(){
+
+ protected boolean testPrepareProcessorWaiting() {
return prepareProcessor.isWaiting();
}
-
+
class ViewReplyProcessor {
volatile int viewId = -1;
final Set<InternalDistributedMember> notRepliedYet = new HashSet<>();
@@ -1521,7 +1532,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
pendingRemovals.clear();
}
- boolean isWaiting(){
+ boolean isWaiting() {
return waiting;
}
@@ -1620,7 +1631,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} finally {
- synchronized(this) {
+ synchronized (this) {
if (!this.waiting) {
// if we've set waiting to false due to incoming messages then
// we've discounted receiving any other responses from the
@@ -1663,7 +1674,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
sendCurrentView();
}
}
-
+
void sendCurrentView() {
NetView v = currentView;
if (v != null) {
@@ -1726,9 +1737,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* All views should be sent by the ViewCreator thread, so
* if this member becomes coordinator it may have an initial
* view to transmit that announces the removal of the former coordinator to
- *
+ *
* @param newView
- * @param leaving - members leaving in this view
+ * @param leaving - members leaving in this view
* @param removals - members crashed in this view
*/
synchronized void setInitialView(NetView newView, List<InternalDistributedMember> newMembers,
@@ -1752,7 +1763,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> iJoins;
Set<InternalDistributedMember> iLeaves;
Set<InternalDistributedMember> iRemoves;
- synchronized(this) {
+ synchronized (this) {
iView = initialView;
iJoins = initialJoins;
iLeaves = initialLeaving;
@@ -1770,7 +1781,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* During initial view processing a prepared view was discovered.
* This method will extract its new members and create a new
* initial view containing them.
- *
+ *
* @param v The prepared view
*/
private void processPreparedView(NetView v) {
@@ -1778,7 +1789,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (currentView == null || currentView.getViewId() < v.getViewId()) {
// we have a prepared view that is newer than the current view
// form a new View ID
- int viewId = Math.max(initialView.getViewId(),v.getViewId());
+ int viewId = Math.max(initialView.getViewId(), v.getViewId());
viewId += 1;
NetView newView = new NetView(initialView, viewId);
@@ -1790,13 +1801,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
} else {
newMembers = v.getMembers();
}
- for (InternalDistributedMember newMember: newMembers) {
+ for (InternalDistributedMember newMember : newMembers) {
newView.add(newMember);
newView.setFailureDetectionPort(newMember, v.getFailureDetectionPort(newMember));
}
// use the new view as the initial view
- synchronized(this) {
+ synchronized (this) {
setInitialView(newView, newMembers, initialLeaving, initialRemovals);
}
}
@@ -1899,7 +1910,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = null;
switch (msg.getDSFID()) {
case JOIN_REQUEST:
- JoinRequestMessage jmsg = (JoinRequestMessage)msg;
+ JoinRequestMessage jmsg = (JoinRequestMessage) msg;
mbr = jmsg.getMemberID();
int port = jmsg.getFailureDetectionPort();
// see if an old member ID is being reused. If
@@ -1940,8 +1951,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
removalReqs.add(mbr);
removalReasons.add(((RemoveMemberMessage) msg).getReason());
} else {
- sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr),
- Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
+ sendRemoveMessages(Collections.<InternalDistributedMember>singletonList(mbr),
+ Collections.<String>singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
}
}
break;
@@ -1977,7 +1988,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// be reused in an auto-reconnect and get a new vmViewID
mbrs.addAll(joinReqs);
newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs));
- for (InternalDistributedMember mbr: joinReqs) {
+ for (InternalDistributedMember mbr : joinReqs) {
if (mbrs.contains(mbr)) {
newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
}
@@ -1998,14 +2009,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
mbr.setVmViewId(newView.getViewId());
mbr.getNetMember().setSplitBrainEnabled(services.getConfig().isNetworkPartitionDetectionEnabled());
}
-
+
if (isShutdown()) {
return;
}
// send removal messages before installing the view so we stop
// getting messages from members that have been kicked out
sendRemoveMessages(removalReqs, removalReasons, newView);
-
+
prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers());
return;
@@ -2078,7 +2089,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> newMembers = conflictingView.getNewMembers();
if (!newMembers.isEmpty()) {
logger.info("adding these new members from a conflicting view to the new view: {}", newMembers);
- for (InternalDistributedMember mbr: newMembers) {
+ for (InternalDistributedMember mbr : newMembers) {
int port = conflictingView.getFailureDetectionPort(mbr);
newView.add(mbr);
newView.setFailureDetectionPort(mbr, port);
@@ -2087,7 +2098,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
// trump the view ID of the conflicting view so mine will be accepted
if (conflictingView.getViewId() >= newView.getViewId()) {
- newView = new NetView(newView, conflictingView.getViewId()+1);
+ newView = new NetView(newView, conflictingView.getViewId() + 1);
}
}
@@ -2105,7 +2116,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers());
newMembers.removeAll(removalReqs);
NetView tempView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
- for (InternalDistributedMember mbr: newView.getMembers()) {
+ for (InternalDistributedMember mbr : newView.getMembers()) {
if (tempView.contains(mbr)) {
tempView.setFailureDetectionPort(mbr, newView.getFailureDetectionPort(mbr));
}
@@ -2113,12 +2124,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
newView = tempView;
int size = failures.size();
List<String> reasons = new ArrayList<>(size);
- for (int i=0; i<size; i++) {
+ for (int i = 0; i < size; i++) {
reasons.add("Failed to acknowledge a new membership view and then failed tcp/ip connection attempt");
}
sendRemoveMessages(failures, reasons, newView);
}
-
+
// if there is no conflicting view then we can count
// the current state as being prepared. All members
// who are going to ack have already done so or passed
@@ -2126,13 +2137,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (conflictingView == null) {
prepared = true;
}
-
+
} while (!prepared);
lastConflictingView = null;
sendView(newView, joinReqs);
-
+
// after sending a final view we need to stop this thread if
// the GMS is shutting down
if (isStopping()) {
@@ -2143,11 +2154,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* performs health checks on the collection of members, removing any that
* are found to be healthy
- *
- * @param suspects
+ *
+ * @param mbrs
*/
- private void removeHealthyMembers(final Set<InternalDistributedMember> suspects) throws InterruptedException {
- List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(suspects.size());
+ private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs) throws InterruptedException {
+ List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
Set<InternalDistributedMember> newRemovals = new HashSet<>();
Set<InternalDistributedMember> newLeaves = new HashSet<>();
@@ -2166,7 +2177,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public InternalDistributedMember call() throws Exception {
boolean available = GMSJoinLeave.this.checkIfAvailable(mbr);
-
+
synchronized (viewRequests) {
if (available) {
suspects.remove(mbr);
@@ -2218,7 +2229,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if(suspects.isEmpty() || newRemovals.containsAll(suspects)) {
break;
}
-
+
viewRequests.wait(waitTime);
waitTime = giveUpTime - System.currentTimeMillis();
}
@@ -2249,22 +2260,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
+
private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) {
List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() );
- for ( Callable<T> task : tasks ) {
+ for (Callable<T> task : tasks) {
result.add(executor.submit(task));
}
return result;
}
-
+
boolean getTestFlageForRemovalRequest() {
return testFlagForRemovalRequest;
}
}
-
+
boolean checkIfAvailable(InternalDistributedMember fmbr) {
// return the member id if it fails health checks
logger.info("checking state of member " + fmbr);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index 91f6918..9e42f1f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -30,18 +30,19 @@ import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.internal.InternalDataSerializer;
public class InstallViewMessage extends HighPriorityDistributionMessage {
-
enum messageType {
INSTALL, PREPARE, SYNC
}
private NetView view;
private Object credentials;
private messageType kind;
+ private int previousViewId;
public InstallViewMessage(NetView view, Object credentials) {
this.view = view;
this.kind = messageType.INSTALL;
this.credentials = credentials;
+ this.previousViewId = view.getViewId();
}
public InstallViewMessage(NetView view, Object credentials, boolean preparing) {
@@ -49,6 +50,13 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
this.kind = preparing? messageType.PREPARE : messageType.INSTALL;
this.credentials = credentials;
}
+
+ public InstallViewMessage(NetView view, Object credentials, int previousViewId, boolean preparing) {
+ this.view = view;
+ this.kind = preparing? messageType.PREPARE : messageType.INSTALL;
+ this.credentials = credentials;
+ this.previousViewId = previousViewId;
+ }
public InstallViewMessage() {
// no-arg constructor for serialization
@@ -62,6 +70,10 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
return view;
}
+ public int getPreviousViewId() {
+ return previousViewId;
+ }
+
public Object getCredentials() {
return credentials;
}
@@ -83,6 +95,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
+ out.writeInt(previousViewId);
out.writeInt(kind.ordinal());
DataSerializer.writeObject(this.view, out);
DataSerializer.writeObject(this.credentials, out);
@@ -91,6 +104,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
+ this.previousViewId = in.readInt();
this.kind = messageType.values()[in.readInt()];
this.view = DataSerializer.readObject(in);
this.credentials = DataSerializer.readObject(in);
@@ -98,7 +112,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
@Override
public String toString() {
- return "InstallViewMessage(type="+this.kind+"; "+this.view
+ return "InstallViewMessage(type="+this.kind+"; Current ViewID="+view.getViewId()+"; Previous View ID="+previousViewId+"; "+this.view
+"; cred="+(credentials==null?"null": "not null")
+")";
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index b77dfdb..67e7c8d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.internal;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.*;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.DataInput;
@@ -104,16 +105,6 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoor
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.GetViewRequest;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.GetViewResponse;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
import com.gemstone.gemfire.distributed.internal.streaming.StreamingOperation.StreamingReplyMessage;
import com.gemstone.gemfire.internal.admin.ClientMembershipMessage;
import com.gemstone.gemfire.internal.admin.remote.AddHealthListenerRequest;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 22ac457..035ba56 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -82,8 +82,6 @@ public interface DataSerializableFixedID extends SerializationVersions {
case FOO:
return new FOO(in);
*/
- public static final short VIEW_REJECT_MESSAGE = -158;
-
public static final short NETWORK_PARTITION_MESSAGE = -157;
public static final short SUSPECT_MEMBERS_MESSAGE = -156;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
index 1917692..c800acf 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
@@ -39,12 +39,21 @@ import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeaveHelper;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import com.gemstone.gemfire.internal.tcp.Connection;
+import com.gemstone.gemfire.test.dunit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
@@ -68,7 +77,7 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion;
public class LocatorDUnitTest extends DistributedTestCase {
static TestHook hook;
-
+
/**
* Creates a new <code>LocatorDUnitTest</code>
*/
@@ -78,12 +87,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
private static final String WAIT2_MS_NAME = "LocatorDUnitTest.WAIT2_MS";
private static final int WAIT2_MS_DEFAULT = 5000; // 2000 -- see bug 36470
- private static final int WAIT2_MS
+ private static final int WAIT2_MS
= Integer.getInteger(WAIT2_MS_NAME, WAIT2_MS_DEFAULT).intValue();
-
+
private int port1;
private int port2;
-
+
@Override
public void setUp() throws Exception {
super.setUp();
@@ -91,7 +100,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
port2 = -1;
IgnoredException.addIgnoredException("Removing shunned member");
}
-
+
@Override
protected final void preTearDown() throws Exception {
if (Locator.hasLocator()) {
@@ -106,7 +115,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
DistributedTestUtils.deleteLocatorStateFile(port2);
}
}
-
+
//////// Test Methods
/**
@@ -123,7 +132,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
DistributedTestUtils.deleteLocatorStateFile(port1);
@@ -140,7 +149,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
InternalDistributedMember mbr = system.getDistributedMember();
assertEquals("expected the VM to have NORMAL vmKind",
DistributionManager.NORMAL_DM_TYPE, system.getDistributedMember().getVmKind());
-
+
properties.remove("start-locator");
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
properties.put("locators", locators);
@@ -156,12 +165,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
Cache cache = CacheFactory.create(system);
Region r = cache.createRegionFactory(RegionShortcut.REPLICATE).create("test region");
assertNotNull("expected to create a region", r);
-
+
// create a lock service and have every vm get a lock
DistributedLockService service = DistributedLockService.create("test service", system);
service.becomeLockGrantor();
service.lock("foo0", 0, 0);
-
+
vm1.invoke(new SerializableRunnable("get the lock service and lock something") {
public void run() {
final DistributedLockService service = DistributedLockService.create("test service", system);
@@ -178,7 +187,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
// cause elder failover. vm1 will become the lock grantor
system.disconnect();
-
+
try {
vm1.invoke(new SerializableRunnable("ensure grantor failover") {
public void run() {
@@ -194,12 +203,12 @@ public class LocatorDUnitTest extends DistributedTestCase {
public String description() {
return "waiting to become lock grantor after shutting down locator/grantor";
}
-
+
}, DistributionConfig.DEFAULT_MEMBER_TIMEOUT * 2, 1000, true);
assertTrue(service.isLockGrantor());
}
});
-
+
properties.put("start-locator", locators);
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
system = (InternalDistributedSystem)DistributedSystem.connect(properties);
@@ -222,7 +231,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
});
assertFalse("should not have become lock grantor", service.isLockGrantor());
-
+
// Now demonstrate that a new member can join and use the lock service
properties.remove("start-locator");
vm3.invoke(startSystem);
@@ -232,7 +241,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
service.lock("foo5", 0, 0);
}
});
-
+
} finally {
disconnectAllFromDS();
}
@@ -243,7 +252,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* split-brain configuration. To work around this we have always told customers that they
* need to stagger the starting of locators. This test configures two locators to start up
* simultaneously and shows that they find each other and form a single system.
- *
+ *
* @throws Exception
*/
public void testStartTwoLocators() throws Exception {
@@ -251,7 +260,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Host host = Host.getHost(0);
VM loc1 = host.getVM(1);
VM loc2 = host.getVM(2);
-
+
int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
final int port1 = ports[0];
this.port1 = port1;
@@ -259,7 +268,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
this.port2 = port2; // for cleanup in tearDown2
DistributedTestUtils.deleteLocatorStateFile(port1);
DistributedTestUtils.deleteLocatorStateFile(port2);
- final String host0 = NetworkUtils.getServerHostName(host);
+ final String host0 = NetworkUtils.getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
host0 + "[" + port2 + "]";
final Properties properties = new Properties();
@@ -385,16 +394,6 @@ public class LocatorDUnitTest extends DistributedTestCase {
Object[] connectArgs = new Object[] { properties };
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
-
assertTrue(MembershipManagerHelper.getLeadMember(sys) == null);
// connect three vms and then watch the lead member selection as they
@@ -417,7 +416,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertLeadMember(mem1, sys, 5000);
// after disconnecting the first vm, the second one should become the leader
- vm1.invoke(disconnect);
+ vm1.invoke(getDisconnectRunnable(locators));
MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
assertLeadMember(mem2, sys, 5000);
@@ -426,15 +425,15 @@ public class LocatorDUnitTest extends DistributedTestCase {
"getDistributedMember", connectArgs);
assertLeadMember(mem2, sys, 5000);
- vm2.invoke(disconnect);
+ vm2.invoke(getDisconnectRunnable(locators));
MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2);
assertLeadMember(mem3, sys, 5000);
- vm1.invoke(disconnect);
+ vm1.invoke(getDisconnectRunnable(locators));
MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1);
assertLeadMember(mem3, sys, 5000);
- vm3.invoke(disconnect);
+ vm3.invoke(getDisconnectRunnable(locators));
MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3);
assertLeadMember(null, sys, 5000);
@@ -486,13 +485,13 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
final int port1 = ports[0];
this.port1 = port1;
final int port2 = ports[1];
DistributedTestUtils.deleteLocatorStateFile(port1, port2);
- final String host0 = NetworkUtils.getServerHostName(host);
+ final String host0 = NetworkUtils.getServerHostName(host);
final String locators = host0 + "[" + port1 + "]," +
host0 + "[" + port2 + "]";
final Properties properties = new Properties();
@@ -504,7 +503,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
// properties.put("log-level", "fine");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -554,7 +553,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm1.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
// ensure quorumLost is properly invoked
DistributionManager dm = (DistributionManager) ((InternalDistributedSystem) sys).getDistributionManager();
MyMembershipListener listener = new MyMembershipListener();
@@ -619,7 +618,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
final int port1 = ports[0];
this.port1 = port1;
@@ -638,7 +637,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
SerializableRunnable stopLocator = getStopLocatorRunnable();
-
+
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -693,7 +692,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm2.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invoke(() -> LocatorDUnitTest.isSystemConnected()));
@@ -708,7 +707,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm2.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invoke(() -> LocatorDUnitTest.isSystemConnected()));
@@ -716,7 +715,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertEquals("This test sometimes fails. If the log contains " +
"'failed to collect all ACKs' it is a false failure.",
mem2, vm2.invoke(() -> LocatorDUnitTest.getLeadMember()));
-
+
SerializableRunnable disconnect =
new SerializableRunnable("Disconnect from " + locators) {
public void run() {
@@ -729,7 +728,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
// disconnect the first vm and demonstrate that the third vm and the
// locator notice the failure and exit
- vm2.invoke(disconnect);
+ vm2.invoke(getDisconnectRunnable(locators));
locvm.invoke(stopLocator);
} finally {
MembershipManagerHelper.inhibitForcedDisconnectLogging(false);
@@ -766,7 +765,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
final int port1 = ports[0];
this.port1 = port1;
@@ -842,7 +841,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm2.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
assertTrue("Distributed system should not have disconnected",
locvm.invoke(() -> LocatorDUnitTest.isSystemConnected()));
@@ -903,7 +902,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
VM vm2 = host.getVM(2);
VM locvm = host.getVM(3);
Locator locator = null;
-
+
int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
final int port1 = ports[0];
this.port1 = port1;
@@ -975,7 +974,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
vm1.invoke(expectedException);
DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(),
"getDistributedMember", connectArgs);
-
+
DistributedMember loc1Mbr = (DistributedMember)locvm.invoke(() -> this.getLocatorDistributedMember());
assertLeadMember(mem1, sys, 5000);
@@ -991,7 +990,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm1.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
assertTrue("Distributed system should not have disconnected",
vm2.invoke(() -> LocatorDUnitTest.isSystemConnected()));
@@ -1002,7 +1001,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
assertTrue("Distributed system should not have disconnected",
vm2.invoke(() -> LocatorDUnitTest.isSystemConnected()));
-
+
assertEquals(sys.getDistributedMember(),
MembershipManagerHelper.getCoordinator(sys));
assertEquals(mem2, MembershipManagerHelper.getLeadMember(sys));
@@ -1146,7 +1145,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
}
};
Wait.waitForCriterion(ev, 15 * 1000, 200, false);
- DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
+ DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system);
com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("coordinator after shutdown of locator was " +
newCoord);
if (coord.equals(newCoord)) {
@@ -1156,17 +1155,8 @@ public class LocatorDUnitTest extends DistributedTestCase {
system.disconnect();
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
- vm1.invoke(disconnect);
- vm2.invoke(disconnect);
+ vm1.invoke(getDisconnectRunnable(locators));
+ vm2.invoke(getDisconnectRunnable(locators));
} finally {
vm0.invoke(getStopLocatorRunnable());
@@ -1404,17 +1394,8 @@ public class LocatorDUnitTest extends DistributedTestCase {
system.disconnect();
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
- vm1.invoke(disconnect);
- vm2.invoke(disconnect);
+ vm1.invoke(getDisconnectRunnable(locators));
+ vm2.invoke(getDisconnectRunnable(locators));
} finally {
vm3.invoke(getStopLocatorRunnable());
@@ -1423,7 +1404,18 @@ public class LocatorDUnitTest extends DistributedTestCase {
vm0.invoke(getStopLocatorRunnable());
}
}
-
+
+ private SerializableRunnable getDisconnectRunnable(final String locators) {
+ return new SerializableRunnable("Disconnect from " + locators) {
+ public void run() {
+ DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
+ if (sys != null && sys.isConnected()) {
+ sys.disconnect();
+ }
+ }
+ };
+ }
+
/**
* Tests starting multiple locators at the same time and ensuring that the locators
* end up only have 1 master.
@@ -1454,7 +1446,6 @@ public class LocatorDUnitTest extends DistributedTestCase {
dsProps.setProperty("log-level", "FINE");
dsProps.setProperty("enable-network-partition-detection", "true");
dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- final String uniqueName = getUniqueName();
startLocatorSync(vm0, new Object[] { port1, dsProps });
startLocatorSync(vm1, new Object[] { port2, dsProps });
@@ -1485,7 +1476,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
WaitCriterion waitCriterion = new WaitCriterion() {
public boolean done() {
try {
- return system.getDM().getViewMembers().size() >= 3;
+ return system.getDM().getViewMembers().size() == 6;
} catch (Exception e) {
e.printStackTrace();
fail("unexpected exception");
@@ -1506,10 +1497,31 @@ public class LocatorDUnitTest extends DistributedTestCase {
vm1.invoke(getStopLocatorRunnable());
vm2.invoke(getStopLocatorRunnable());
+ waitCriterion = new WaitCriterion() {
+ public boolean done() {
+ try {
+ return system.getDM().getAllHostedLocators().size() == 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception");
+ }
+ return false; // NOTREACHED
+ }
+
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true);
+
final String newLocators = host0 + "[" + port2 + "]," +
host0 + "[" + port3 + "]";
dsProps.setProperty("locators", newLocators);
+ assertTrue(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+ //Given the start up order of servers, this server is the elder server
+ assertTrue(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+
startLocatorAsync(vm1, new Object[] { port2, dsProps });
startLocatorAsync(vm2, new Object[] { port3, dsProps });
@@ -1530,19 +1542,23 @@ public class LocatorDUnitTest extends DistributedTestCase {
};
DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true);
+ int netviewId = vm1.invoke(() -> GMSJoinLeaveHelper.getViewId());
+ assertEquals(netviewId, (int) vm2.invoke(() -> GMSJoinLeaveHelper.getViewId()));
+ assertEquals(netviewId, (int) vm3.invoke(() -> GMSJoinLeaveHelper.getViewId()));
+ assertEquals(netviewId, (int) vm4.invoke(() -> GMSJoinLeaveHelper.getViewId()));
+ assertFalse(vm4.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+ //Given the start up order of servers, this server is the elder server
+ assertFalse(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+ if (vm1.invoke(() -> GMSJoinLeaveHelper.isViewCreator())) {
+ assertFalse(vm2.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+ } else {
+ assertTrue(vm2.invoke(() -> GMSJoinLeaveHelper.isViewCreator()));
+ }
+
} finally {
system.disconnect();
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
- vm3.invoke(disconnect);
- vm4.invoke(disconnect);
+ vm3.invoke(getDisconnectRunnable(locators));
+ vm4.invoke(getDisconnectRunnable(locators));
vm2.invoke(getStopLocatorRunnable());
vm1.invoke(getStopLocatorRunnable());
}
@@ -1680,17 +1696,8 @@ public class LocatorDUnitTest extends DistributedTestCase {
Wait.waitForCriterion(ev, WAIT2_MS, 200, true);
system.disconnect();
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
- vm1.invoke(disconnect);
- vm2.invoke(disconnect);
+ vm1.invoke(getDisconnectRunnable(locators));
+ vm2.invoke(getDisconnectRunnable(locators));
} finally {
SerializableRunnable stop = getStopLocatorRunnable();
vm0.invoke(stop);
@@ -1849,15 +1856,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
connect.run();
//vm1.invoke(connect);
- SerializableRunnable disconnect =
- new SerializableRunnable("Disconnect from " + locators) {
- public void run() {
- DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
- if (sys != null && sys.isConnected()) {
- sys.disconnect();
- }
- }
- };
+ SerializableRunnable disconnect = getDisconnectRunnable(locators);
disconnect.run();
//vm1.invoke(disconnect);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edc1c4c6/gradle/rat.gradle
----------------------------------------------------------------------
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index 496b20a..68e04c4 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -61,7 +61,7 @@ rat {
'**/*.log',
'**/*.patch',
'**/*.diff',
- '**/*.MF',
+ '**/*.MF',
// binary files
'**/*.cer',