You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2015/12/04 19:43:10 UTC
incubator-geode git commit: Adding additional unit tests for
GMSHealthMonitor and GMSJoinLeave Moved GMSHealthMonitorJUnitTest to package
level of GMSHealthMonitor Minor refactoring to allow testing
Repository: incubator-geode
Updated Branches:
refs/heads/develop e0bf6858a -> 3259c0238
Adding additional unit tests for GMSHealthMonitor and GMSJoinLeave
Moved GMSHealthMonitorJUnitTest to package level of GMSHealthMonitor
Minor refactoring to allow testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3259c023
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3259c023
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3259c023
Branch: refs/heads/develop
Commit: 3259c0238a97413268297b663219d72bf8b130c9
Parents: e0bf685
Author: Jason Huynh <hu...@gmail.com>
Authored: Wed Dec 2 15:11:40 2015 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Fri Dec 4 10:40:52 2015 -0800
----------------------------------------------------------------------
.../membership/InternalDistributedMember.java | 16 +-
.../membership/gms/fd/GMSHealthMonitor.java | 60 +-
.../membership/gms/membership/GMSJoinLeave.java | 8 +
.../gms/fd/GMSHealthMonitorJUnitTest.java | 576 +++++++++++++++++++
.../gms/membership/GMSJoinLeaveJUnitTest.java | 31 +-
.../fd/GMSHealthMonitorJUnitTest.java | 461 ---------------
6 files changed, 666 insertions(+), 486 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index 10478b7..b112b92 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -292,7 +292,21 @@ public final class InternalDistributedMember
* @throws UnknownHostException if the given hostname cannot be resolved
*/
public InternalDistributedMember(String i, int p, Version version) throws UnknownHostException {
- netMbr = MemberFactory.newNetMember(i, p);
+ this (i, p, version, MemberFactory.newNetMember(i, p));
+ }
+
+ /**
+ * Create a InternalDistributedMember referring to the current host (as defined by the given
+ * string).<p>
+ *
+ * <b>
+ * THIS METHOD IS FOR TESTING ONLY. DO NOT USE IT TO CREATE IDs FOR
+ * USE IN THE PRODUCT. IT DOES NOT PROPERLY INITIALIZE ATTRIBUTES NEEDED
+ * FOR P2P FUNCTIONALITY.
+ * </b>
+ **/
+ public InternalDistributedMember(String i, int p, Version version, NetMember netMember) throws UnknownHostException {
+ netMbr = netMember;
defaultToCurrentHost();
this.vmKind = DistributionManager.NORMAL_DM_TYPE;
this.versionObj = version;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index fcda1a0..cc64f9b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -56,6 +56,7 @@ import org.jgroups.util.UUID;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
@@ -173,9 +174,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// For TCP check
private ExecutorService serverSocketExecutor;
- private static final int OK = 0x7B;
- private static final int ERROR = 0x00;
- private InetAddress socketAddress;
+ static final int OK = 0x7B;
+ static final int ERROR = 0x00;
private volatile int socketPort;
private volatile ServerSocket serverSocket;
@@ -491,11 +491,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
InputStream in = clientSocket.getInputStream();
DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
- out.writeShort(Version.CURRENT_ORDINAL);
- out.writeInt(gmbr.getVmViewId());
- out.writeLong(gmbr.getUuidLSBs());
- out.writeLong(gmbr.getUuidMSBs());
- out.flush();
+ writeMemberToStream(gmbr, out);
clientSocket.shutdownOutput();
logger.debug("Connected - reading response", suspectMember);
int b = in.read();
@@ -531,6 +527,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return false;
}
+ void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException {
+ out.writeShort(Version.CURRENT_ORDINAL);
+ out.writeInt(gmbr.getVmViewId());
+ out.writeLong(gmbr.getUuidLSBs());
+ out.writeLong(gmbr.getUuidMSBs());
+ out.flush();
+ }
+
/*
* (non-Javadoc)
*
@@ -610,32 +614,37 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
- /**
- * start the thread that listens for tcp/ip connections and responds
- * to connection attempts
- */
- private void startTcpServer() {
- // allocate a socket here so there are no race conditions between knowing the FD
- // socket port and joining the system
- socketAddress = localAddress.getInetAddress();
- int[] portRange = services.getConfig().getMembershipPortRange();
+ ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
+ ServerSocket serverSocket = null;
try {
serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange);
socketPort = serverSocket.getLocalPort();
} catch (IOException e) {
throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
+ } catch (SystemConnectException e) {
+ throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
}
+ return serverSocket;
+ }
+
+ /**
+ * start the thread that listens for tcp/ip connections and responds
+ * to connection attempts
+ */
+ private void startTcpServer(ServerSocket ssocket) {
+ // allocate a socket here so there are no race conditions between knowing the FD
+ // socket port and joining the system
serverSocketExecutor.execute(new Runnable() {
@Override
public void run() {
- logger.info("Started failure detection server thread on {}:{}.", socketAddress, socketPort);
+ logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort);
Socket socket = null;
try {
while (!services.getCancelCriterion().isCancelInProgress()
&& !GMSHealthMonitor.this.isStopping) {
try {
- socket = serverSocket.accept();
+ socket = ssocket.accept();
if (GMSHealthMonitor.this.playingDead) {
continue;
}
@@ -658,9 +667,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("GMSHealthMonitor server thread exiting");
} finally {
// close the server socket
- if (serverSocket != null && !serverSocket.isClosed()) {
+ if (ssocket != null && !ssocket.isClosed()) {
try {
- serverSocket.close();
+ ssocket.close();
serverSocket = null;
logger.info("GMSHealthMonitor server socket closed.");
} catch (IOException e) {
@@ -841,8 +850,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void started() {
- this.localAddress = services.getMessenger().getMemberID();
- startTcpServer();
+ setLocalAddress( services.getMessenger().getMemberID());
+ serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange());
+ startTcpServer(serverSocket);
startHeartbeatThread();
}
@@ -941,6 +951,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void emergencyClose() {
stopServices();
}
+
+ void setLocalAddress(InternalDistributedMember idm) {
+ this.localAddress = idm;
+ }
@Override
public void processMessage(DistributionMessage m) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 2986238..84a0bd7 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -1058,6 +1058,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse.notify();
}
}
+
+ /**
+ * for testing, do not use in any other case as it is not thread safe
+ * @param req
+ */
+ JoinResponseMessage[] getJoinResponseMessage() {
+ return joinResponse;
+ }
private void processFindCoordinatorRequest(FindCoordinatorRequest req) {
FindCoordinatorResponse resp;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
new file mode 100644
index 0000000..86205b9
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.jgroups.util.UUID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper;
+import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor.ClientSocketHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GMSHealthMonitorJUnitTest {
+
+ private Services services;
+ private ServiceConfig mockConfig;
+ private DistributionConfig mockDistConfig;
+ private List<InternalDistributedMember> mockMembers;
+ private Messenger messenger;
+ private JoinLeave joinLeave;
+ private GMSHealthMonitor gmsHealthMonitor;
+ private Manager manager;
+ final long memberTimeout = 1000l;
+ private int[] portRange= new int[]{0, 65535};
+
+ @Before
+ public void initMocks() throws UnknownHostException {
+ System.setProperty("gemfire.bind-address", "localhost");
+ mockDistConfig = mock(DistributionConfig.class);
+ mockConfig = mock(ServiceConfig.class);
+ messenger = mock(Messenger.class);
+ joinLeave = mock(JoinLeave.class);
+ manager = mock(Manager.class);
+ services = mock(Services.class);
+ Stopper stopper = mock(Stopper.class);
+
+ when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig);
+ when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout);
+ when(mockConfig.getMembershipPortRange()).thenReturn(portRange);
+ when(services.getConfig()).thenReturn(mockConfig);
+ when(services.getMessenger()).thenReturn(messenger);
+ when(services.getJoinLeave()).thenReturn(joinLeave);
+ when(services.getCancelCriterion()).thenReturn(stopper);
+ when(services.getManager()).thenReturn(manager);
+ when(stopper.isCancelInProgress()).thenReturn(false);
+
+
+ if (mockMembers == null) {
+ mockMembers = new ArrayList<InternalDistributedMember>();
+ for (int i = 0; i < 7; i++) {
+ InternalDistributedMember mbr = new InternalDistributedMember("localhost", 8888 + i);
+
+ if (i == 0 || i == 1) {
+ mbr.setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+ mbr.getNetMember().setPreferredForCoordinator(true);
+ }
+ mockMembers.add(mbr);
+ }
+ }
+ when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor = new GMSHealthMonitor();
+ gmsHealthMonitor.init(services);
+ gmsHealthMonitor.start();
+ }
+
+ @After
+ public void tearDown() {
+ gmsHealthMonitor.stop();
+ }
+
+ @Test
+ public void testHMServiceStarted() throws IOException {
+
+ InternalDistributedMember mbr = new InternalDistributedMember(SocketCreator.getLocalHost(), 12345);
+ mbr.setVmViewId(1);
+ when(messenger.getMemberID()).thenReturn(mbr);
+ gmsHealthMonitor.started();
+
+ NetView v = new NetView(mbr, 1, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ gmsHealthMonitor.processMessage(new HeartbeatRequestMessage(mbr, 1));
+ verify(messenger, atLeastOnce()).send(any(HeartbeatMessage.class));
+ }
+
+ /**
+ * checks who is next neighbor
+ */
+ @Test
+ public void testHMNextNeighborVerify() throws IOException {
+
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor());
+
+ }
+
+ @Test
+ public void testHMNextNeighborAfterTimeout() throws Exception {
+ System.out.println("testHMNextNeighborAfterTimeout starting");
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+// System.out.printf("memberID is %s view is %s\n", mockMembers.get(3), v);
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ // allow the monitor to give up on the initial "next neighbor" and
+ // move on to the one after it
+ long giveup = System.currentTimeMillis() + memberTimeout + 500;
+ InternalDistributedMember expected = mockMembers.get(5);
+ InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
+ while (System.currentTimeMillis() < giveup && neighbor != expected) {
+ Thread.sleep(5);
+ neighbor = gmsHealthMonitor.getNextNeighbor();
+ }
+
+ // neighbor should change to 5th
+ System.out.println("testHMNextNeighborAfterTimeout ending");
+ Assert.assertEquals("expected " + expected + " but found " + neighbor
+ + ". view="+v, expected, neighbor);
+ }
+
+ /**
+ * it checks neighbor before member-timeout, it should be same
+ */
+
+ @Test
+ public void testHMNextNeighborBeforeTimeout() throws IOException {
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ //Should we remove these sleeps and force the checkmember directly instead of waiting?
+ try {
+ // member-timeout is 1000 ms. We initiate a check and choose
+ // a new neighbor at 500 ms
+ Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL - 100);
+ } catch (InterruptedException e) {
+ }
+ // neighbor should be same
+ System.out.println("next neighbor is " + gmsHealthMonitor.getNextNeighbor() +
+ "\nmy address is " + mockMembers.get(3) +
+ "\nview is " + v);
+
+ Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor());
+ }
+
+ /***
+ * checks whether member-check thread sends suspectMembers message
+ */
+ @Test
+ public void testSuspectMembersCalledThroughMemberCheckThread() throws Exception {
+ System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ // when the view is installed we start a heartbeat timeout. After
+ // that expires we request a heartbeat
+ Thread.sleep(3*memberTimeout + 100);
+
+ System.out.println("testSuspectMembersCalledThroughMemberCheckThread ending");
+ assertTrue(gmsHealthMonitor.isSuspectMember(mockMembers.get(4)));
+ }
+
+ /***
+ * checks ping thread didn't sends suspectMembers message before timeout
+ */
+ @Test
+ public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() {
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+ InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
+
+ try {
+ // member-timeout is 1000 ms
+ // plus 100 ms for ack
+ Thread.sleep(memberTimeout - 200);
+ } catch (InterruptedException e) {
+ }
+
+ assertFalse(gmsHealthMonitor.isSuspectMember(neighbor));
+ }
+
+ /***
+ * Checks whether suspect thread sends suspectMembers message
+ */
+ @Test
+ public void testSuspectMembersCalledThroughSuspectThread() throws Exception {
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+
+ gmsHealthMonitor.installView(v);
+
+ gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
+
+ Thread.sleep(GMSHealthMonitor.MEMBER_SUSPECT_COLLECTION_INTERVAL + 1000);
+
+ verify(messenger, atLeastOnce()).send(any(SuspectMembersMessage.class));
+ }
+
+ /***
+ * Checks suspect thread doesn't sends suspectMembers message before timeout
+ */
+ @Test
+ public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() {
+
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ MethodExecuted messageSent = new MethodExecuted();
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+
+ gmsHealthMonitor.installView(v);
+
+ gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
+
+ when(messenger.send(isA(SuspectMembersMessage.class))).thenAnswer(messageSent);
+
+ try {
+ // suspect thread timeout is 200 ms
+ Thread.sleep(100l);
+ } catch (InterruptedException e) {
+ }
+
+ assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted());
+ }
+
+ /***
+ * Send remove member message after doing final check, ping Timeout
+ */
+ @Test
+ public void testRemoveMemberCalled() throws Exception {
+ System.out.println("testRemoveMemberCalled starting");
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL);
+
+ ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+ recipient.add(mockMembers.get(0));
+ ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+ SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
+ as.add(sr);
+ SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+ sm.setSender(mockMembers.get(0));
+
+ gmsHealthMonitor.processMessage(sm);
+
+ Thread.sleep(2*memberTimeout + 200);
+
+ System.out.println("testRemoveMemberCalled ending");
+ verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
+ }
+
+ /***
+ * Shouldn't send remove member message before doing final check, or before ping Timeout
+ */
+ @Test
+ public void testRemoveMemberNotCalledBeforeTimeout() {
+ System.out.println("testRemoveMemberNotCalledBeforeTimeout starting");
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
+ when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+ recipient.add(mockMembers.get(0));
+ ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+ SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
+ as.add(sr);
+ SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+ sm.setSender(mockMembers.get(0));
+
+ gmsHealthMonitor.processMessage(sm);
+
+ try {
+ // this happens after final check, ping timeout
+ Thread.sleep(memberTimeout-100);
+ } catch (InterruptedException e) {
+ }
+
+ System.out.println("testRemoveMemberNotCalledBeforeTimeout ending");
+ verify(joinLeave, never()).remove(any(InternalDistributedMember.class), any(String.class));
+ }
+
+ /***
+ * Send remove member message after doing final check for coordinator, ping timeout
+ * This test trying to remove coordinator
+ */
+ @Test
+ public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() throws Exception {
+
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // preferred coordinators are 0 and 1
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator
+ gmsHealthMonitor.started();
+
+ gmsHealthMonitor.installView(v);
+
+ Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL);
+
+ ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+ recipient.add(mockMembers.get(0));
+ recipient.add(mockMembers.get(1));
+ ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+ SuspectRequest sr = new SuspectRequest(mockMembers.get(0), "Not Responding");// removing coordinator
+ as.add(sr);
+ SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+ sm.setSender(mockMembers.get(4));// member 4 sends suspect message
+
+ gmsHealthMonitor.processMessage(sm);
+
+ // this happens after final check, ping timeout = 1000 ms
+ Thread.sleep(memberTimeout + 200);
+
+ verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
+ }
+
+ /***
+ * validates HealthMonitor.CheckIfAvailable api
+ */
+ @Test
+ public void testCheckIfAvailable() {
+
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+
+ gmsHealthMonitor.installView(v);
+
+ long startTime = System.currentTimeMillis();
+
+ boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false);
+
+ long timeTaken = System.currentTimeMillis() - startTime;
+
+ assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90);
+ assertTrue("CheckIfAvailable should have return false", !retVal);
+ }
+
+ @Test
+ public void testShutdown() {
+
+ NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
+
+ // 3rd is current member
+ when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
+
+ gmsHealthMonitor.installView(v);
+
+ gmsHealthMonitor.stop();
+
+ try {
+ // this happens after final check, membertimeout = 1000
+ Thread.sleep(100l);
+ } catch (InterruptedException e) {
+ }
+
+ assertTrue("HeathMonitor should have shutdown", gmsHealthMonitor.isShutdown());
+
+ }
+
+ @Test
+ public void testCreateServerSocket() throws Exception {
+ try (ServerSocket socket = gmsHealthMonitor.createServerSocket(InetAddress.getLocalHost(), portRange)) {
+ Assert.assertTrue( portRange[0] <= socket.getLocalPort() && socket.getLocalPort() <= portRange[1]);
+ }
+ }
+
+ @Test
+ public void testCreateServerSocketPortRangeInvalid() throws Exception {
+ try (ServerSocket socket = gmsHealthMonitor.createServerSocket(InetAddress.getLocalHost(), new int[]{-1, -1})) {
+ Assert.fail("socket was created with invalid port range");
+ }
+ catch (IllegalArgumentException e) {
+
+ }
+ }
+
+ @Test
+ public void testClientSocketHandler() throws Exception {
+ int viewId = 2;
+ long msb = 3;
+ long lsb = 4;
+ GMSMember otherMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb);
+ GMSMember gmsMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb);
+ executeTestClientSocketHandler(gmsMember, otherMember, GMSHealthMonitor.OK);
+ }
+
+ @Test
+ public void testClientSocketHandlerWhenMsbDoNotMatch() throws Exception {
+ int viewId = 2;
+ long msb = 3;
+ long lsb = 4;
+ GMSMember otherMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb + 1, lsb);
+ GMSMember gmsMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb);
+ executeTestClientSocketHandler(gmsMember, otherMember, GMSHealthMonitor.ERROR);
+ }
+
+ @Test
+ public void testClientSocketHandlerWhenLsbDoNotMatch() throws Exception {
+ int viewId = 2;
+ long msb = 3;
+ long lsb = 4;
+ GMSMember otherMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb + 1);
+ GMSMember gmsMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb);
+ executeTestClientSocketHandler(gmsMember, otherMember, GMSHealthMonitor.ERROR);
+ }
+
+ @Test
+ public void testClientSocketHandlerWhenViewIdDoNotMatch() throws Exception {
+ int viewId = 2;
+ long msb = 3;
+ long lsb = 4;
+ GMSMember otherMember = createGMSMember(Version.CURRENT_ORDINAL, viewId + 1, msb, lsb);
+ GMSMember gmsMember = createGMSMember(Version.CURRENT_ORDINAL, viewId, msb, lsb);
+ executeTestClientSocketHandler(gmsMember, otherMember, GMSHealthMonitor.ERROR);
+ }
+
+ public void executeTestClientSocketHandler(GMSMember gmsMember, GMSMember otherMember, int expectedResult) throws Exception {
+ //We have already set the view id in the member but when creating the IDM it resets it to -1 for some reason
+ int viewId = gmsMember.getVmViewId();
+
+ InternalDistributedMember testMember = new InternalDistributedMember("localhost", 9000, Version.CURRENT, gmsMember);
+ //We set to our expected test viewId in the IDM as well as reseting the gms member
+ testMember.setVmViewId(viewId);
+ gmsMember.setBirthViewId(viewId);
+
+
+ //Set up the incoming/received bytes. We just wrap output streams and write out the gms member information
+ byte[] receivedBytes = writeMemberToBytes(otherMember);
+ InputStream mockInputStream = new ByteArrayInputStream(receivedBytes);
+
+ //configure the mock to return the mocked incoming bytes and provide an outputstream that we will check
+ Socket fakeSocket = mock(Socket.class);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ when(fakeSocket.getInputStream()).thenReturn(mockInputStream);
+ when(fakeSocket.getOutputStream()).thenReturn(outputStream);
+
+ //run the socket handler
+ gmsHealthMonitor.setLocalAddress(testMember);
+ ClientSocketHandler handler = gmsHealthMonitor.new ClientSocketHandler(fakeSocket);
+ handler.run();
+
+ //verify the written bytes are as expected
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(outputStream.toByteArray()));
+ int byteReply = dis.read();
+ Assert.assertEquals(expectedResult, byteReply);
+ }
+
+ private GMSMember createGMSMember(short version, int viewId, long msb, long lsb) {
+ GMSMember gmsMember = new GMSMember();
+ gmsMember.setVersionOrdinal(version);
+ gmsMember.setBirthViewId(viewId);
+ gmsMember.setUUID(new UUID(msb, lsb));
+ return gmsMember;
+ }
+
+ private byte[] writeMemberToBytes(GMSMember gmsMember) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dataReceive = new DataOutputStream(baos);
+ gmsHealthMonitor.writeMemberToStream(gmsMember, dataReceive);
+ return baos.toByteArray();
+ }
+
+
+ private class MethodExecuted implements Answer {
+ private boolean methodExecuted = false;
+
+ public boolean isMethodExecuted() {
+ return methodExecuted;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ methodExecuted = true;
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index abc7a2f..9895f68 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.isA;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -244,6 +244,21 @@ public class GMSJoinLeaveJUnitTest {
verify(messenger).send(any(JoinResponseMessage.class));
}
+ //This test does not test the actual join process but rather that the join response gets loggedß
+ @Test
+ public void testProcessJoinResponseIsRecorded() throws IOException {
+ initMocks();
+ when(services.getAuthenticator()).thenReturn(authenticator);
+ when(authenticator.authenticate(mockMembers[0], null)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
+ when(services.getMessenger()).thenReturn(messenger);
+
+ JoinResponseMessage[] joinResponse = gmsJoinLeave.getJoinResponseMessage();
+
+ JoinResponseMessage jrm = new JoinResponseMessage();
+ gmsJoinLeave.processMessage(jrm);
+ Assert.assertEquals(jrm, joinResponse[0]);
+ }
+
/**
* prepares and install a view
* @throws IOException
@@ -635,6 +650,20 @@ public class GMSJoinLeaveJUnitTest {
verify(manager).quorumLost(crashes, newView);
}
+ //Possibly modify test to check for network partition message in the force disconnect
+ @Test
+ public void testNetworkPartitionMessageReceived() throws Exception {
+ initMocks();
+ gmsJoinLeave.becomeCoordinatorForTest();
+ List<InternalDistributedMember> members = Arrays.asList(mockMembers);
+ Set<InternalDistributedMember> empty = Collections.<InternalDistributedMember>emptySet();
+ NetView v = new NetView(mockMembers[0], 2, members, empty, empty);
+ NetworkPartitionMessage message = new NetworkPartitionMessage();
+ gmsJoinLeave.processMessage(message);
+ verify(manager).forceDisconnect(any(String.class));
+ }
+
+
@Test
public void testQuorumLossNotificationWithNetworkPartitionDetectionDisabled() throws IOException {
initMocks(false);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3259c023/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java
deleted file mode 100644
index 41a99a6..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.membership.gms.membership.fd;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper;
-import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
-import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class GMSHealthMonitorJUnitTest {
-
- private Services services;
- private ServiceConfig mockConfig;
- private DistributionConfig mockDistConfig;
- private List<InternalDistributedMember> mockMembers;
- private Messenger messenger;
- private JoinLeave joinLeave;
- private GMSHealthMonitor gmsHealthMonitor;
- private Manager manager;
- final long memberTimeout = 1000l;
- private int[] portRange= new int[]{0, 65535};
-
- @Before
- public void initMocks() throws UnknownHostException {
- System.setProperty("gemfire.bind-address", "localhost");
- mockDistConfig = mock(DistributionConfig.class);
- mockConfig = mock(ServiceConfig.class);
- messenger = mock(Messenger.class);
- joinLeave = mock(JoinLeave.class);
- manager = mock(Manager.class);
- services = mock(Services.class);
- Stopper stopper = mock(Stopper.class);
-
- when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig);
- when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout);
- when(mockConfig.getMembershipPortRange()).thenReturn(portRange);
- when(services.getConfig()).thenReturn(mockConfig);
- when(services.getMessenger()).thenReturn(messenger);
- when(services.getJoinLeave()).thenReturn(joinLeave);
- when(services.getCancelCriterion()).thenReturn(stopper);
- when(services.getManager()).thenReturn(manager);
- when(stopper.isCancelInProgress()).thenReturn(false);
-
-
- if (mockMembers == null) {
- mockMembers = new ArrayList<InternalDistributedMember>();
- for (int i = 0; i < 7; i++) {
- InternalDistributedMember mbr = new InternalDistributedMember("localhost", 8888 + i);
-
- if (i == 0 || i == 1) {
- mbr.setVmKind(DistributionManager.LOCATOR_DM_TYPE);
- mbr.getNetMember().setPreferredForCoordinator(true);
- }
- mockMembers.add(mbr);
- }
- }
- when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor = new GMSHealthMonitor();
- gmsHealthMonitor.init(services);
- gmsHealthMonitor.start();
- }
-
- @After
- public void tearDown() {
- gmsHealthMonitor.stop();
- System.getProperties().remove("gemfire.bind-address");
- }
-
- @Test
- public void testHMServiceStarted() throws IOException {
-
- InternalDistributedMember mbr = new InternalDistributedMember(SocketCreator.getLocalHost(), 12345);
- mbr.setVmViewId(1);
- when(messenger.getMemberID()).thenReturn(mbr);
- gmsHealthMonitor.started();
-
- NetView v = new NetView(mbr, 1, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- gmsHealthMonitor.processMessage(new HeartbeatRequestMessage(mbr, 1));
- verify(messenger, atLeastOnce()).send(any(HeartbeatMessage.class));
- }
-
- /**
- * checks who is next neighbor
- */
- @Test
- public void testHMNextNeighborVerify() throws IOException {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor());
-
- }
-
- @Test
- public void testHMNextNeighborAfterTimeout() throws Exception {
- System.out.println("testHMNextNeighborAfterTimeout starting");
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
-// System.out.printf("memberID is %s view is %s\n", mockMembers.get(3), v);
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- // allow the monitor to give up on the initial "next neighbor" and
- // move on to the one after it
- long giveup = System.currentTimeMillis() + memberTimeout + 600;
- InternalDistributedMember expected = mockMembers.get(5);
- InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
- while (System.currentTimeMillis() < giveup && neighbor != expected) {
- Thread.sleep(5);
- neighbor = gmsHealthMonitor.getNextNeighbor();
- }
-
- // neighbor should change to 5th
- System.out.println("testHMNextNeighborAfterTimeout ending");
- Assert.assertEquals("expected " + expected + " but found " + neighbor
- + ". view="+v, expected, neighbor);
- }
-
- /**
- * it checks neighbor before member-timeout, it should be same
- */
-
- @Test
- public void testHMNextNeighborBeforeTimeout() throws IOException {
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- try {
- // member-timeout is 1000 ms. We initiate a check and choose
- // a new neighbor at 500 ms
- Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL - 100);
- } catch (InterruptedException e) {
- }
- // neighbor should be same
- System.out.println("next neighbor is " + gmsHealthMonitor.getNextNeighbor() +
- "\nmy address is " + mockMembers.get(3) +
- "\nview is " + v);
-
- Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor());
- }
-
- /***
- * checks whether member-check thread sends suspectMembers message
- */
- @Test
- public void testSuspectMembersCalledThroughMemberCheckThread() throws Exception {
- System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- // when the view is installed we start a heartbeat timeout. After
- // that expires we request a heartbeat
- Thread.sleep(3*memberTimeout + 100);
-
- System.out.println("testSuspectMembersCalledThroughMemberCheckThread ending");
- assertTrue(gmsHealthMonitor.isSuspectMember(mockMembers.get(4)));
- }
-
- /***
- * checks ping thread didn't sends suspectMembers message before timeout
- */
- @Test
- public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() {
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
- InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
-
- try {
- // member-timeout is 1000 ms
- // plus 100 ms for ack
- Thread.sleep(memberTimeout - 200);
- } catch (InterruptedException e) {
- }
-
- assertFalse(gmsHealthMonitor.isSuspectMember(neighbor));
- }
-
- /***
- * Checks whether suspect thread sends suspectMembers message
- */
- @Test
- public void testSuspectMembersCalledThroughSuspectThread() throws Exception {
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-
- gmsHealthMonitor.installView(v);
-
- gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
-
- Thread.sleep(GMSHealthMonitor.MEMBER_SUSPECT_COLLECTION_INTERVAL + 1000);
-
- verify(messenger, atLeastOnce()).send(any(SuspectMembersMessage.class));
- }
-
- /***
- * Checks suspect thread doesn't sends suspectMembers message before timeout
- */
- @Test
- public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- MethodExecuted messageSent = new MethodExecuted();
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-
- gmsHealthMonitor.installView(v);
-
- gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
-
- when(messenger.send(isA(SuspectMembersMessage.class))).thenAnswer(messageSent);
-
- try {
- // suspect thread timeout is 200 ms
- Thread.sleep(100l);
- } catch (InterruptedException e) {
- }
-
- assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted());
- }
-
- /***
- * Send remove member message after doing final check, ping Timeout
- */
- @Test
- public void testRemoveMemberCalled() throws Exception {
- System.out.println("testRemoveMemberCalled starting");
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL);
-
- ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
- recipient.add(mockMembers.get(0));
- ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
- SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
- as.add(sr);
- SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
- sm.setSender(mockMembers.get(0));
-
- gmsHealthMonitor.processMessage(sm);
-
- Thread.sleep(2*memberTimeout + 200);
-
- System.out.println("testRemoveMemberCalled ending");
- verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
- }
-
- /***
- * Shouldn't send remove member message before doing final check, or before ping Timeout
- */
- @Test
- public void testRemoveMemberNotCalledBeforeTimeout() {
- System.out.println("testRemoveMemberNotCalledBeforeTimeout starting");
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
- when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
- recipient.add(mockMembers.get(0));
- ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
- SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
- as.add(sr);
- SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
- sm.setSender(mockMembers.get(0));
-
- gmsHealthMonitor.processMessage(sm);
-
- try {
- // this happens after final check, ping timeout
- Thread.sleep(memberTimeout-100);
- } catch (InterruptedException e) {
- }
-
- System.out.println("testRemoveMemberNotCalledBeforeTimeout ending");
- verify(joinLeave, never()).remove(any(InternalDistributedMember.class), any(String.class));
- }
-
- /***
- * Send remove member message after doing final check for coordinator, ping timeout
- * This test trying to remove coordinator
- */
- @Test
- public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() throws Exception {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // preferred coordinators are 0 and 1
- when(messenger.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator
- gmsHealthMonitor.started();
-
- gmsHealthMonitor.installView(v);
-
- Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL);
-
- ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
- recipient.add(mockMembers.get(0));
- recipient.add(mockMembers.get(1));
- ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
- SuspectRequest sr = new SuspectRequest(mockMembers.get(0), "Not Responding");// removing coordinator
- as.add(sr);
- SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
- sm.setSender(mockMembers.get(4));// member 4 sends suspect message
-
- gmsHealthMonitor.processMessage(sm);
-
- // this happens after final check, ping timeout = 1000 ms
- Thread.sleep(memberTimeout + 200);
-
- verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class));
- }
-
- /***
- * validates HealthMonitor.CheckIfAvailable api
- */
- @Test
- public void testCheckIfAvailable() {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-
- gmsHealthMonitor.installView(v);
-
- long startTime = System.currentTimeMillis();
-
- boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false);
-
- long timeTaken = System.currentTimeMillis() - startTime;
-
- assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90);
- assertTrue("CheckIfAvailable should have return false", !retVal);
- }
-
- @Test
- public void testShutdown() {
-
- NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());
-
- // 3rd is current member
- when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-
- gmsHealthMonitor.installView(v);
-
- gmsHealthMonitor.stop();
-
- try {
- // this happens after final check, membertimeout = 1000
- Thread.sleep(100l);
- } catch (InterruptedException e) {
- }
-
- assertTrue("HeathMonitor should have shutdown", gmsHealthMonitor.isShutdown());
-
- }
-
- private class MethodExecuted implements Answer {
- private boolean methodExecuted = false;
-
- public boolean isMethodExecuted() {
- return methodExecuted;
- }
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- methodExecuted = true;
- return null;
- }
- }
-}