You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/29 06:56:56 UTC
[04/13] incubator-geode git commit: GEODE-870: test case fixes.
GEODE-870: test case fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0af59683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0af59683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0af59683
Branch: refs/heads/develop
Commit: 0af59683b847ea431e4b653867e74c3d714c8ac1
Parents: 8b24a4f
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Feb 17 14:51:19 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Feb 29 16:36:08 2016 +1100
----------------------------------------------------------------------
.../membership/gms/membership/GMSJoinLeave.java | 212 ++++++++++++-------
.../cache30/CacheSerializableRunnable.java | 16 +-
.../test/dunit/SerializableRunnable.java | 2 +-
3 files changed, 156 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 8ba4952..dcb2721 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -55,104 +55,166 @@ import static com.gemstone.gemfire.internal.DataSerializableFixedID.*;
* that Geode formerly used for this purpose.
*/
public class GMSJoinLeave implements JoinLeave, MessageHandler {
-
+
public static final String BYPASS_DISCOVERY_PROPERTY = "gemfire.bypass-discovery";
- /** amount of time to wait for responses to FindCoordinatorRequests */
+ /**
+ * amount of time to wait for responses to FindCoordinatorRequests
+ */
private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000);
- /** amount of time to sleep before trying to join after a failed attempt */
+ /**
+ * amount of time to sleep before trying to join after a failed attempt
+ */
private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
- /** time to wait for a broadcast message to be transmitted by jgroups */
+ /**
+ * time to wait for a broadcast message to be transmitted by jgroups
+ */
private static final long BROADCAST_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.broadcast-message-sleep-time", 1000);
- /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
+ /**
+ * if the locators don't know who the coordinator is we send find-coord requests to this many nodes
+ */
private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
- /** interval for broadcasting the current view to members in case they didn't get it the first time */
+ /**
+ * interval for broadcasting the current view to members in case they didn't get it the first time
+ */
private static final long VIEW_BROADCAST_INTERVAL = Long.getLong("gemfire.view-broadcast-interval", 60000);
- /** membership logger */
+ /**
+ * membership logger
+ */
private static final Logger logger = Services.getLogger();
- /** the view ID where I entered into membership */
+ /**
+ * the view ID where I entered into membership
+ */
private int birthViewId;
- /** my address */
+ /**
+ * my address
+ */
private InternalDistributedMember localAddress;
private Services services;
- /** have I connected to the distributed system? */
+ /**
+ * have I connected to the distributed system?
+ */
private volatile boolean isJoined;
- /** guarded by viewInstallationLock */
+ /**
+ * guarded by viewInstallationLock
+ */
private boolean isCoordinator;
- /** a synch object that guards view installation */
+ /**
+ * a synch object that guards view installation
+ */
private final Object viewInstallationLock = new Object();
-
- /** the currently installed view. Guarded by viewInstallationLock */
+
+ /**
+ * the currently installed view. Guarded by viewInstallationLock
+ */
private volatile NetView currentView;
- /** the previous view **/
+ /**
+ * the previous view
+ **/
private volatile NetView previousView;
- /** members who we have been declared dead in the current view */
+ /**
+ * members who we have been declared dead in the current view
+ */
private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
- /** members who we've received a leave message from **/
+ /**
+ * members who we've received a leave message from
+ **/
private final Set<InternalDistributedMember> leftMembers = new HashSet<>();
- /** a new view being installed */
+ /**
+ * a new view being installed
+ */
private NetView preparedView;
- /** the last view that conflicted with view preparation */
+ /**
+ * the last view that conflicted with view preparation
+ */
private NetView lastConflictingView;
private List<InetSocketAddress> locators;
- /** a list of join/leave/crashes */
+ /**
+ * a list of join/leave/crashes
+ */
private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
- /** the established request collection jitter. This can be overridden for testing with delayViewCreationForTest */
+ /**
+ * the established request collection jitter. This can be overridden for testing with delayViewCreationForTest
+ */
long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
- /** collects the response to a join request */
+ /**
+ * collects the response to a join request
+ */
private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
- /** collects responses to new views */
+ /**
+ * collects responses to new views
+ */
private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
- /** collects responses to view preparation messages */
+ /**
+ * collects responses to view preparation messages
+ */
private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
- /** whether quorum checks can cause a forced-disconnect */
+ /**
+ * whether quorum checks can cause a forced-disconnect
+ */
private boolean quorumRequired = false;
- /** timeout in receiving view acknowledgement */
+ /**
+ * timeout in receiving view acknowledgement
+ */
private int viewAckTimeout;
- /** background thread that creates new membership views */
+ /**
+ * background thread that creates new membership views
+ */
private ViewCreator viewCreator;
- /** am I shutting down? */
+ /**
+ * am I shutting down?
+ */
private volatile boolean isStopping;
- /** state of collected artifacts during discovery */
+ /**
+ * state of collected artifacts during discovery
+ */
final SearchState searchState = new SearchState();
- /** a collection used to detect unit testing */
+ /**
+ * a collection used to detect unit testing
+ */
Set<String> unitTesting = new HashSet<>();
-
- /** a test hook to make this member unresponsive */
+
+ /**
+ * a test hook to make this member unresponsive
+ */
private volatile boolean playingDead;
-
- /** the view where quorum was most recently lost */
+
+ /**
+ * the view where quorum was most recently lost
+ */
NetView quorumLostView;
- /** a flag to mark a coordinator's viewCreator for shutdown */
+ /**
+ * a flag to mark a coordinator's viewCreator for shutdown
+ */
private boolean markViewCreatorForShutdown = false;
static class SearchState {
@@ -182,13 +244,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* attempt to join the distributed system
* loop
- * send a join request to a locator & get a response
- *
+ * send a join request to a locator & get a response
+ * <p>
* If the response indicates there's no coordinator it
* will contain a set of members that have recently contacted
* it. The "oldest" member is selected as the coordinator
* based on ID sort order.
- *
+ *
* @return true if successful, false if not
*/
public boolean join() {
@@ -372,12 +434,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
|| !currentView.contains(mbr)) {
return true;
}
- synchronized(removedMembers) {
+ synchronized (removedMembers) {
if (removedMembers.contains(mbr)) {
return true;
}
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
if (leftMembers.contains(mbr)) {
return true;
}
@@ -419,13 +481,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
-// Remove JoinResponseMessage to fix GEODE-870
-// if (!this.localAddress.getNetMember().preferredForCoordinator() &&
-// incomingRequest.getMemberID().getNetMember().preferredForCoordinator()){
-// JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
-// services.getMessenger().send(joinResponseMessage);
-// return;
-// }
+ // Remove JoinResponseMessage to fix GEODE-870
+// if (!this.localAddress.getNetMember().preferredForCoordinator() &&
+// incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
+// JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
+// services.getMessenger().send(joinResponseMessage);
+// return;
+// }
recordViewRequest(incomingRequest);
}
@@ -449,8 +511,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = incomingRequest.getMemberID();
if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
- +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress());
+ logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator=" + isCoordinator + "; isStopping=" + isStopping
+ + "; cancelInProgress=" + services.getCancelCriterion().isCancelInProgress());
}
if (!v.contains(mbr) && mbr.getVmViewId() < v.getViewId()) {
@@ -472,12 +534,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
check.removeAll(removedMembers);
check.addCrashedMembers(removedMembers);
}
- synchronized(leftMembers) {
+ synchronized (leftMembers) {
leftMembers.add(mbr);
check.removeAll(leftMembers);
}
if (check.getCoordinator().equals(localAddress)) {
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
becomeCoordinator(incomingRequest.getMemberID());
}
}
@@ -516,8 +578,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!fromMe) {
logger.info("Membership received a request to remove " + mbr
- + " from " + incomingRequest.getSender()
- + " reason="+incomingRequest.getReason());
+ + " from " + incomingRequest.getSender()
+ + " reason=" + incomingRequest.getReason());
}
if (mbr.equals(this.localAddress)) {
@@ -754,7 +816,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (preparing) {
logger.debug("waiting for view responses");
- Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
+ Set<InternalDistributedMember> failedToRespond = viewReplyProcessor.waitForResponses();
logger.info("finished waiting for responses to view preparation");
@@ -775,7 +837,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return true;
}
- private void processViewMessage(final InstallViewMessage installViewMessage) {
+ private void processViewMessage(final InstallViewMessage m) {
NetView view = m.getView();
@@ -789,7 +851,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!this.isJoined) {
// if we're still waiting for a join response and we're in this view we
// should install the view so join() can finish its work
- for (InternalDistributedMember mbr: view.getMembers()) {
+ for (InternalDistributedMember mbr : view.getMembers()) {
if (localAddress.compareTo(mbr) == 0) {
viewContainsMyUnjoinedAddress = true;
break;
@@ -940,7 +1002,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
InternalDistributedMember coord = null;
boolean coordIsNoob = true;
- for (; it.hasNext();) {
+ for (; it.hasNext(); ) {
InternalDistributedMember mbr = it.next();
if (!state.alreadyTried.contains(mbr)) {
boolean mbrIsNoob = (mbr.getVmViewId() < 0);
@@ -1603,13 +1665,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- /** call with synchronized(this) */
+ /**
+ * call with synchronized(this)
+ */
private void stopWaitingFor(InternalDistributedMember mbr) {
notRepliedYet.remove(mbr);
checkIfDone();
}
- /** call with synchronized(this) */
+ /**
+ * call with synchronized(this)
+ */
private void checkIfDone() {
if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) {
logger.debug("All anticipated view responses received - notifying waiting thread");
@@ -1828,7 +1894,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
sendInitialView();
long okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
try {
- for (;;) {
+ for (; ; ) {
synchronized (viewRequests) {
if (shutdown) {
return;
@@ -2168,8 +2234,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
*
* @param mbrs
*/
- private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs) throws InterruptedException {
- List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+ private void removeHealthyMembers(final Set<InternalDistributedMember> suspects) throws InterruptedException {
+ List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(suspects.size());
Set<InternalDistributedMember> newRemovals = new HashSet<>();
Set<InternalDistributedMember> newLeaves = new HashSet<>();
@@ -2197,6 +2263,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return mbr;
}
+
@Override
public String toString() {
return mbr.toString();
@@ -2228,7 +2295,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// now wait for the tasks to do their work
long waitTime = giveUpTime - System.currentTimeMillis();
synchronized (viewRequests) {
- while ( waitTime > 0 ) {
+ while (waitTime > 0) {
logger.debug("removeHealthyMembers: mbrs" + suspects.size());
filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST);
@@ -2237,7 +2304,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
suspects.removeAll(newLeaves);
- if(suspects.isEmpty() || newRemovals.containsAll(suspects)) {
+ if (suspects.isEmpty() || newRemovals.containsAll(suspects)) {
break;
}
@@ -2252,19 +2319,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* This gets pending requests and returns the IDs of any that are in the given collection
- * @param mbrs collection of IDs to search for
+ *
+ * @param mbrs collection of IDs to search for
* @param matchingMembers collection to store matching IDs in
- * @param requestType leave/remove/join
+ * @param requestType leave/remove/join
*/
protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember> matchingMembers, short requestType) {
Set<InternalDistributedMember> requests = getPendingRequestIDs(requestType);
- if(!requests.isEmpty()) {
+ if (!requests.isEmpty()) {
logger.debug("filterMembers: processing " + requests.size() + " requests for type " + requestType);
Iterator<InternalDistributedMember> itr = requests.iterator();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
InternalDistributedMember memberID = itr.next();
- if(mbrs.contains(memberID)) {
+ if (mbrs.contains(memberID)) {
testFlagForRemovalRequest = true;
matchingMembers.add(memberID);
}
@@ -2272,8 +2340,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) {
- List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() );
+ private <T> List<Future<T>> submitAll(ExecutorService executor, Collection<? extends Callable<T>> tasks) {
+ List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
for (Callable<T> task : tasks) {
result.add(executor.submit(task));
@@ -2288,7 +2356,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
boolean checkIfAvailable(InternalDistributedMember fmbr) {
- // return the member id if it fails health checks
+ // return the member id if it fails health checks
logger.info("checking state of member " + fmbr);
if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
logger.info("member " + fmbr + " passed availability check");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
index 8b80def..3745928 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
@@ -48,13 +48,25 @@ public abstract class CacheSerializableRunnable
}
/**
+ * Creates a new <code>CacheSerializableRunnable</code> with the
+ * given name
+ */
+ public CacheSerializableRunnable(String name,Object[] args) {
+ super(name);
+ this.args = args;
+ }
+
+ /**
* Invokes the {@link #run2} method and will wrap any {@link
* CacheException} thrown by <code>run2</code> in a {@link
* CacheSerializableRunnableException}.
*/
public final void run() {
try {
- run2();
+ if(args == null){
+ run2();}else{
+ run3();
+ }
} catch (CacheException ex) {
String s = "While invoking \"" + this + "\"";
@@ -96,6 +108,8 @@ public abstract class CacheSerializableRunnable
*/
public abstract void run2() throws CacheException;
+ public void run3() throws CacheException{}
+
///////////////////////// Inner Classes /////////////////////////
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
index 4caf815..2e24cfd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
@@ -51,7 +51,7 @@ public abstract class SerializableRunnable implements SerializableRunnableIF {
private static final long serialVersionUID = 7584289978241650456L;
private String name;
- private Object[] args;
+ protected Object[] args;
public SerializableRunnable() {
this.name = null;