You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2015/12/21 19:04:40 UTC
[12/50] [abbrv] incubator-geode git commit: Revert "Removing
TCPConduit's Stub ID class"
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
index 41e5837..a2801c1 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
@@ -45,7 +45,6 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
@@ -64,7 +63,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
/** <p>TCPConduit manages a server socket and a collection of connections to
- other systems. Connections are identified by DistributedMember IDs.
+ other systems. Connections are identified by host/port Stubs.
These types of messages are currently supported:</p><pre>
DistributionMessage - message is delivered to the server's
@@ -176,8 +175,9 @@ public class TCPConduit implements Runnable {
////////////////// runtime state that is re-initialized on a restart
- /** server socket address */
- private InetSocketAddress id;
+ /** id is an endpoint Stub representing this server. It holds the
+ actual port the server is listening on */
+ private Stub id;
protected volatile boolean stopped;
@@ -351,7 +351,7 @@ public class TCPConduit implements Runnable {
try {
localPort = socket.getLocalPort();
- id = new InetSocketAddress(socket.getInetAddress(), localPort);
+ id = new Stub(socket.getInetAddress(), localPort, 0);
stopped = false;
ThreadGroup group =
LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
@@ -371,12 +371,23 @@ public class TCPConduit implements Runnable {
}
}
catch (IOException io) {
- String s = "While creating ServerSocket on port " + p;
+ String s = "While creating ServerSocket and Stub on port " + p;
throw new ConnectionException(s, io);
}
this.port = localPort;
}
+ /**
+ * After startup we install the view ID into the conduit stub to avoid
+ * confusion during overlapping shutdown/startup from the same member.
+ *
+ * @param viewID
+ */
+ public void setVmViewID(int viewID) {
+ this.id.setViewID(viewID);
+ }
+
+
/** creates the server sockets. This can be used to recreate the
* socket using this.port and this.bindAddress, which must be set
* before invoking this method.
@@ -585,7 +596,7 @@ public class TCPConduit implements Runnable {
public void run() {
ConnectionTable.threadWantsSharedResources();
if (logger.isTraceEnabled(LogMarker.DM)) {
- logger.trace(LogMarker.DM, "Starting P2P Listener on {}", id);
+ logger.trace(LogMarker.DM, "Starting P2P Listener on {}", this.getId());
}
for(;;) {
SystemFailure.checkFailure();
@@ -720,7 +731,7 @@ public class TCPConduit implements Runnable {
} // for
if (logger.isTraceEnabled(LogMarker.DM)) {
- logger.debug("Stopped P2P Listener on {}", id);
+ logger.debug("Stopped P2P Listener on {}", this.getId());
}
}
@@ -796,7 +807,7 @@ public class TCPConduit implements Runnable {
* @since 5.1
*/
public void getThreadOwnedOrderedConnectionState(
- DistributedMember member,
+ Stub member,
Map result)
{
getConTable().getThreadOwnedOrderedConnectionState(member, result);
@@ -808,7 +819,7 @@ public class TCPConduit implements Runnable {
* with the key
* @since 5.1
*/
- public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
+ public void waitForThreadOwnedOrderedConnectionState(Stub member, Map channelState)
throws InterruptedException
{
// if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState
@@ -831,12 +842,13 @@ public class TCPConduit implements Runnable {
msg.setBytesRead(bytesRead);
msg.setSender(receiver.getRemoteAddress());
msg.setSharedReceiver(receiver.isSharedResource());
- directChannel.receive(msg, bytesRead);
+ directChannel.receive(msg, bytesRead, receiver.getRemoteId());
}
}
- /** gets the address of this conduit's ServerSocket endpoint */
- public InetSocketAddress getId() {
+ /** gets the Stub representing this conduit's ServerSocket endpoint. This
+ is used to generate other stubs containing endpoint information. */
+ public Stub getId() {
return id;
}
@@ -858,16 +870,21 @@ public class TCPConduit implements Runnable {
}
- /** gets the channel that is used to process non-DistributedMember messages */
+ /** gets the channel that is used to process non-Stub messages */
public DirectChannel getDirectChannel() {
return directChannel;
}
+ public InternalDistributedMember getMemberForStub(Stub s, boolean validate) {
+ return membershipManager.getMemberForStub(s, validate);
+ }
+
public void setLocalAddr(InternalDistributedMember addr) {
localAddr = addr;
+ this.id.setViewID(addr.getVmViewId());
}
- public InternalDistributedMember getLocalAddr() {
+ public InternalDistributedMember getLocalId() {
return localAddr;
}
@@ -877,6 +894,7 @@ public class TCPConduit implements Runnable {
* member is in the membership view and the system is not shutting down.
*
* @param memberAddress the IDS associated with the remoteId
+ * @param remoteId the TCPConduit stub for this member
* @param preserveOrder whether this is an ordered or unordered connection
* @param retry false if this is the first attempt
* @param startTime the time this operation started
@@ -884,8 +902,8 @@ public class TCPConduit implements Runnable {
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero)
* @return the connection
*/
- public Connection getConnection(InternalDistributedMember memberAddress, final boolean preserveOrder, boolean retry, long startTime,
- long ackTimeout, long ackSATimeout)
+ public Connection getConnection(InternalDistributedMember memberAddress, Stub remoteId, final boolean preserveOrder, boolean retry,
+ long startTime, long ackTimeout, long ackSATimeout)
throws java.io.IOException, DistributedSystemDisconnectedException
{
//final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR);
@@ -904,7 +922,11 @@ public class TCPConduit implements Runnable {
// problems. Tear down the connection so that it gets
// rebuilt.
if (retry || conn != null) { // not first time in loop
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
+ // Consult with the membership manager; if member has gone away,
+ // there will not be an entry for this stub.
+ InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
+ if (m == null) {
+ // OK, the member left. Just register an error.
throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
// bug35953: Member is still in view; we MUST NOT give up!
@@ -919,14 +941,15 @@ public class TCPConduit implements Runnable {
}
// try again after sleep
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
+ m = this.membershipManager.getMemberForStub(remoteId, true);
+ if (m == null) {
// OK, the member left. Just register an error.
throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
// Print a warning (once)
if (memberInTrouble == null) {
- memberInTrouble = memberAddress;
+ memberInTrouble = m;
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
}
else {
@@ -940,8 +963,8 @@ public class TCPConduit implements Runnable {
if (conn != null) {
try {
if (logger.isDebugEnabled()) {
- logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
- conn, memberInTrouble);
+ logger.debug("Closing old connection. conn={} before retrying. remoteID={} memberInTrouble={}",
+ conn, remoteId, memberInTrouble);
}
conn.closeForReconnect("closing before retrying");
}
@@ -962,10 +985,10 @@ public class TCPConduit implements Runnable {
boolean debugRetry = false;
do {
retryForOldConnection = false;
- conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
+ conn = getConTable().get(remoteId, preserveOrder, startTime, ackTimeout, ackSATimeout);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be thrown
- problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
+ problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(remoteId));
} else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
if (logger.isDebugEnabled()) {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
@@ -1004,14 +1027,15 @@ public class TCPConduit implements Runnable {
if (problem != null) {
// Some problems are not recoverable; check and error out early.
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
+ InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
+ if (m == null) { // left the view
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info(LocalizedMessage.create(
LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
}
- throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
+ throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(remoteId));
} // left the view
if (membershipManager.shutdownInProgress()) { // shutdown in progress
@@ -1030,12 +1054,12 @@ public class TCPConduit implements Runnable {
if (memberInTrouble == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1,
- new Object[] {memberAddress, problem}));
- memberInTrouble = memberAddress;
+ new Object[] {m, problem}));
+ memberInTrouble = m;
}
else {
if (logger.isDebugEnabled()) {
- logger.debug("Error sending message to {}", memberAddress, problem);
+ logger.debug("Error sending message to {}", m, problem);
}
}
@@ -1049,7 +1073,7 @@ public class TCPConduit implements Runnable {
throw (IOException)problem;
}
else {
- IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
+ IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(remoteId));
ioe.initCause(problem);
throw ioe;
}
@@ -1065,8 +1089,8 @@ public class TCPConduit implements Runnable {
LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0,
memberInTrouble));
if (logger.isTraceEnabled()) {
- logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
- }
+ logger.trace("new connection is {} remoteId={} memberAddress={}", conn, remoteId, memberAddress);
+ }
}
return conn;
}
@@ -1078,6 +1102,180 @@ public class TCPConduit implements Runnable {
} // for(;;)
}
+// /**
+// * Send a message.
+// * @return the connection used to send the message
+// * @throws IOException if peer departed view or shutdown in progress
+// */
+// private Connection send(Stub remoteId, ByteBuffer bb, boolean preserveOrder, DistributionMessage msg)
+// throws java.io.IOException
+// {
+// if (stopped) {
+// throw new ConduitStoppedException("The conduit is stopped");
+// }
+
+// if (!QUIET) {
+// LogWriterI18n l = getLogger();
+// if (l.finerEnabled()) {
+// l.finer(id.toString() + " sending " + bb
+// + " to " + remoteId);
+// }
+// }
+
+// Connection conn = null;
+// InternalDistributedMember memberInTrouble = null;
+// for (;;) {
+// // If this is the second time through this loop, we had
+// // problems. Tear down the connection so that it gets
+// // rebuilt.
+// if (conn != null) { // not first time in loop
+// // Consult with the membership manager; if member has gone away,
+// // there will not be an entry for this stub.
+// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
+// if (m == null) {
+// // OK, the member left. Just register an error.
+// throw new IOException("TCP/IP connection lost and member no longer in view");
+// }
+// // bug35953: Member is still in view; we MUST NOT give up!
+
+// // Pause just a tiny bit...
+// try {
+// Thread.sleep(5000);
+// }
+// catch (InterruptedException e) {
+// Thread.currentThread().interrupt();
+// if (membershipManager.shutdownInProgress()) { // shutdown in progress
+// // Bracket our original warning
+// if (memberInTrouble != null) {
+// logger.info("Ending retry attempt because shutdown has started.");
+// }
+// throw new IOException("Abandoned because shutdown is in progress");
+// } // shutdown in progress
+
+// // Strange random interrupt intercepted?
+// logger.warning("Thread has been interrupted but no shutdown in progress", e);
+// throw new DistributedSystemDisconnectedException(e);
+// }
+
+// // Print a warning (once)
+// if (memberInTrouble == null) {
+// memberInTrouble = m;
+// getLogger().warning("Attempting TCP/IP reconnect to " + memberInTrouble);
+// }
+// else {
+// getLogger().fine("Attempting TCP/IP reconnect to " + memberInTrouble);
+// }
+
+// // Close the connection (it will get rebuilt later).
+// this.stats.incReconnectAttempts();
+// try {
+// conn.closeForReconnect("closing before retrying");
+// }
+// catch (CancelException ex) {
+// // In general we ignore close problems, but if the system
+// // is shutting down, we should just quit.
+// throw ex;
+// }
+// catch (Exception ex) {
+// }
+// } // not first time in loop
+
+// // Do the send
+// Exception problem = null;
+// try {
+// // Get (or regenerate) the connection
+// // bug36202: this could generate a ConnectionException, so it
+// // must be caught and retried
+// conn = getConTable().get(remoteId, preserveOrder);
+// // getLogger().info ("connections returned " + conn);
+// if (conn == null) {
+// // conduit may be closed - otherwise an ioexception would be thrown
+// throw new IOException("Unable to reconnect to server; possible shutdown: "
+// + remoteId);
+// }
+
+// conn.sendPreserialized(bb, msg);
+// }
+// catch (ConnectionException e) {
+// // Race condition between acquiring the connection and attempting
+// // to use it: another thread closed it.
+// problem = e;
+// }
+// catch (IOException e) {
+// problem = e;
+// }
+
+// if (problem != null) {
+// // Some problems are not recoverable; check an error out early.
+// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
+// if (m == null) { // left the view
+// // Bracket our original warning
+// if (memberInTrouble != null) {
+// logger.info("Ending retry attempt because " + memberInTrouble
+// + " has disappeared.");
+// }
+// throw new IOException("Peer has disappeared from view");
+// } // left the view
+
+// if (membershipManager.shutdownInProgress()) { // shutdown in progress
+// // Bracket our original warning
+// if (memberInTrouble != null) {
+// logger.info("Ending retry attempt because shutdown has started.");
+// }
+// throw new IOException("Abandoned because shutdown is in progress");
+// } // shutdown in progress
+
+// if (endpointRemoved(remoteId)) { // endpoint removed
+// // TODO what does this mean?
+// // Bracket our original warning
+// if (memberInTrouble != null) {
+// logger.info("Ending retry attempt because " + memberInTrouble
+// + " has lost its endpoint.");
+// }
+// throw new IOException("Endpoint was removed");
+// } // endpoint removed
+
+// // Log the warning. We wait until now, because we want
+// // to have m defined for a nice message...
+// if (memberInTrouble == null) {
+// logger.warning(
+// "Error sending message to " + m + " (will reattempt): "
+// + problem.toString(),
+// logger.finerEnabled() ? problem : null);
+// memberInTrouble = m;
+// }
+// else {
+// logger.fine("Error sending message to " + m, problem);
+// }
+
+// // Retry the operation (indefinitely)
+// continue;
+// } // problem != null
+// // Success!
+
+// // Make sure our logging is bracketed if there was a problem
+// if (memberInTrouble != null) {
+// logger.info("Successfully reestablished connection to server "
+// + memberInTrouble);
+// }
+// return conn;
+// } // while retry
+// }
+
+// /**
+// * Sends an already serialized message in a byte buffer
+// * to the given endpoint. Waits for the send to complete
+// * before returning.
+// * @return the connection used to send the message
+// */
+// public Connection sendSync(Stub remoteId, ByteBuffer bb, int processorType, DistributionMessage msg)
+// throws java.io.IOException
+// {
+// return send(remoteId, bb,
+// processorType == DistributionManager.SERIAL_EXECUTOR,
+// msg);
+// }
+
@Override
public String toString() {
return "" + id;
@@ -1103,22 +1301,22 @@ public class TCPConduit implements Runnable {
return directChannel.getDM();
}
/**
- * Closes any connections used to communicate with the given member
+ * Closes any connections used to communicate with the given stub
*/
- public void removeEndpoint(DistributedMember mbr, String reason) {
- removeEndpoint(mbr, reason, true);
+ public void removeEndpoint(Stub stub, String reason) {
+ removeEndpoint(stub, reason, true);
}
- public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
+ public void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
ConnectionTable ct = this.conTable;
if (ct == null) {
return;
}
- ct.removeEndpoint(mbr, reason, notifyDisconnect);
+ ct.removeEndpoint(stub, reason, notifyDisconnect);
}
/** check to see if there are still any receiver threads for the given end-point */
- public boolean hasReceiversFor(DistributedMember endPoint) {
+ public boolean hasReceiversFor(Stub endPoint) {
ConnectionTable ct = this.conTable;
return (ct != null) && ct.hasReceiversFor(endPoint);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
index 773ef38..1f411bb 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
@@ -45,10 +46,13 @@ import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
+import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.tcp.Stub;
import dunit.DistributedTestCase;
import dunit.Host;
+import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
@@ -188,7 +192,7 @@ public class DistributionManagerDUnitTest extends DistributedTestCase {
sys.getLogWriter().info("<ExpectedException action=add>attempt to add old member</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>Removing shunned GemFire node</ExpectedException>");
try {
- boolean accepted = mgr.addSurpriseMember(mbr);
+ boolean accepted = mgr.addSurpriseMember(mbr, new Stub());
Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted);
} finally {
sys.getLogWriter().info("<ExpectedException action=remove>attempt to add old member</ExpectedException>");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
index ddbda0b..44e1b46 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
@@ -16,18 +16,8 @@
*/
package com.gemstone.gemfire.distributed.internal.membership.gms.mgr;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
@@ -46,6 +36,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.internal.AdminMessageType;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
@@ -53,6 +44,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.HighPriorityAckedMessage;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.MembershipListener;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
@@ -68,11 +60,19 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager.StartupEvent;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.AdminRequest;
+import com.gemstone.gemfire.internal.admin.remote.AdminResponse;
import com.gemstone.gemfire.internal.admin.remote.AlertListenerMessage;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
+import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -295,6 +295,13 @@ public class GMSMembershipManagerJUnitTest {
suspectMember = mockMembers[1];
manager.handleOrDeferSuspect(new SuspectMember(mockMembers[0], suspectMember, "testing"));
verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing");
+
+ InternalDistributedMember mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false);
+ assertTrue(mbr == null);
+ myMemberId.setDirectChannelPort(2033);
+ mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false);
+ assertTrue(mbr != null);
+ assertEquals(mbr, myMemberId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
index ffd5092..78c462f 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.net.Socket;
import org.junit.Test;
@@ -62,7 +61,7 @@ public class ConnectionJUnitTest {
when(stopper.cancelInProgress()).thenReturn(null);
when(conduit.getCancelCriterion()).thenReturn(stopper);
- when(conduit.getId()).thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337));
+ when(conduit.getId()).thenReturn(new Stub(SocketCreator.getLocalHost(), 10337, 1));
// NIO can't be mocked because SocketChannel has a final method that
// is used by Connection - configureBlocking