You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2015/12/16 01:07:10 UTC
incubator-geode git commit: GEODE-653: Add unit test for
GMSHealthMonitor checkIfAvailable Removed unused code Minor javadoc
corrections
Repository: incubator-geode
Updated Branches:
refs/heads/develop d40d8a70d -> 8c9af2ab4
GEODE-653: Add unit test for GMSHealthMonitor checkIfAvailable
Removed unused code
Minor javadoc corrections
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8c9af2ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8c9af2ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8c9af2ab
Branch: refs/heads/develop
Commit: 8c9af2ab4f4a03eda607fe3d6c95511e7d181117
Parents: d40d8a7
Author: Jason Huynh <hu...@gmail.com>
Authored: Fri Dec 11 10:17:39 2015 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Tue Dec 15 16:07:33 2015 -0800
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 114 +------------------
.../gms/fd/GMSHealthMonitorJUnitTest.java | 42 ++++---
2 files changed, 31 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8c9af2ab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 2aac935..33c7e76 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -86,12 +86,12 @@ import com.gemstone.gemfire.internal.Version;
* to remove that member from view.
*
* It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
- * to initiate suspect processing for any member. First is checks whether member is
- * responding or not. Then it informs to probable coordinators to remove that member from
+ * to initiate suspect processing for any member. First is checks whether the member is
+ * responding or not. Then it informs probable coordinators to remove that member from
* view.
*
* It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
- * if that member is alive. Then based on removal flag it initiate the suspect processing
+ * if that member is alive. Then based on removal flag it initiates the suspect processing
* for that member.
*
* */
@@ -158,9 +158,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private ExecutorService checkExecutor;
-// List<SuspectRequest> suspectRequests = new ArrayList<SuspectRequest>();
-// private RequestCollector<SuspectRequest> suspectRequestCollectorThread;
-
/**
* to stop check scheduler
*/
@@ -464,17 +461,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
/**
- * Check for recent messaging activity from the given member
- * @param suspectMember
- * @return whether there has been activity within memberTimeout ms
- */
- private boolean checkRecentActivity(InternalDistributedMember suspectMember) {
- TimeStamp ts = memberTimeStamps.get(suspectMember);
- return (ts != null && (System.currentTimeMillis() - ts.getTime()) <= memberTimeout);
- }
-
-
- /**
* During final check, establish TCP connection between current member and suspect member.
* And exchange PING/PONG message to see if the suspect member is still alive.
*
@@ -1238,100 +1224,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return this.socketPort;
}
- interface Callback<T> {
- public void process(List<T> requests);
- }
-
- /***
- * this thread will collect suspect message for some time interval
- * then it send message to current coordinator first if its not in
- * suspected list. if its in then it will send message to next probable
- * coordinator. NOTE: this thread will not check-server for verification
- * assuming many servers are going down and lets coordinator deals with it.
- *
- * Should we wait for ack from coordinator/probable coordinator that I got
- * request to suspect these members.
- *
- */
- class RequestCollector<T> extends Thread {
- volatile boolean shutdown = false;
- final List<T> listToTrack;
- final Callback<T> callback;
- final long timeout;
-
- public RequestCollector(String name, ThreadGroup tg, List<T> l, Callback<T> c, long t) {
- super(tg, name);
- listToTrack = l;
- callback = c;
- timeout = t;
- }
-
- void shutdown() {
- shutdown = true;
- synchronized (listToTrack) {
- listToTrack.notify();
- interrupt();
- }
- }
-
- boolean isShutdown() {
- return shutdown;
- }
-
- @Override
- public void run() {
- List<T> requests = null;
- logger.debug("Suspect thread is starting");
- long okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
- try {
- for (;;) {
- synchronized (listToTrack) {
- if (shutdown || services.getCancelCriterion().isCancelInProgress()) {
- return;
- }
- if (listToTrack.isEmpty()) {
- try {
- logger.trace("Result collector is waiting");
- listToTrack.wait();
- } catch (InterruptedException e) {
- return;
- }
- } else {
- long now = System.currentTimeMillis();
- if (now < okayToSendSuspectRequest) {
- // sleep to let more suspect requests arrive
- try {
- sleep(okayToSendSuspectRequest - now);
- continue;
- } catch (InterruptedException e) {
- return;
- }
- } else {
- if (requests == null) {
- requests = new ArrayList<T>(listToTrack);
- } else {
- requests.addAll(listToTrack);
- }
- listToTrack.clear();
- okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
- }
- }
- } // synchronized
- if (requests != null && !requests.isEmpty()) {
- if (logger != null && logger.isDebugEnabled()) {
- logger.info("Health Monitor is sending {} member suspect requests to coordinator", requests.size());
- }
- callback.process(requests);
- requests = null;
- }
- }
- } finally {
- shutdown = true;
- logger.debug("Suspect thread is stopped");
- }
- }
- }
-
private void sendSuspectRequest(final List<SuspectRequest> requests) {
// the background suspect-collector thread is currently disabled
// synchronized (suspectRequests) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8c9af2ab/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index eb17ca8..d539374 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -20,7 +20,11 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -42,6 +46,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -407,26 +413,34 @@ public class GMSHealthMonitorJUnitTest {
* validates HealthMonitor.CheckIfAvailable api
*/
@Test
- public void testCheckIfAvailable() {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-
- gmsHealthMonitor.installView(v);
-
+ public void testCheckIfAvailableNoHeartBeatDontRemoveMember() {
long startTime = System.currentTimeMillis();
-
boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false);
-
long timeTaken = System.currentTimeMillis() - startTime;
- assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90);
- assertTrue("CheckIfAvailable should have return false", !retVal);
+ assertTrue("This should have taken member ping timeout 100ms ", timeTaken >= gmsHealthMonitor.memberTimeout);
+ assertFalse("CheckIfAvailable should have return false", retVal);
}
@Test
+ public void testCheckIfAvailableWithSimulatedHeartBeat() {
+ InternalDistributedMember memberToCheck = mockMembers.get(1);
+ HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+ fakeHeartbeat.setSender(memberToCheck);
+ when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ gmsHealthMonitor.processMessage(fakeHeartbeat);
+ return null;
+ }
+ });
+
+ boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+ assertTrue("CheckIfAvailable should have return true", retVal);
+ }
+
+
+ @Test
public void testShutdown() {
NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());