You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2015/12/21 19:04:40 UTC

[12/50] [abbrv] incubator-geode git commit: Revert "Removing TCPConduit's Stub ID class"

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
index 41e5837..a2801c1 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
@@ -45,7 +45,6 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.DMStats;
@@ -64,7 +63,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 
 /** <p>TCPConduit manages a server socket and a collection of connections to
-    other systems.  Connections are identified by DistributedMember IDs.
+    other systems.  Connections are identified by host/port Stubs.
     These types of messages are currently supported:</p><pre>
 
        DistributionMessage - message is delivered to the server's
@@ -176,8 +175,9 @@ public class TCPConduit implements Runnable {
   
   ////////////////// runtime state that is re-initialized on a restart
 
-  /** server socket address */
-  private InetSocketAddress id;
+  /** id is an endpoint Stub representing this server.  It holds the
+      actual port the server is listening on */
+  private Stub id;
 
   protected volatile boolean stopped;
 
@@ -351,7 +351,7 @@ public class TCPConduit implements Runnable {
     try {
       localPort = socket.getLocalPort();
 
-      id = new InetSocketAddress(socket.getInetAddress(), localPort);
+      id = new Stub(socket.getInetAddress(), localPort, 0);
       stopped = false;
       ThreadGroup group =
         LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
@@ -371,12 +371,23 @@ public class TCPConduit implements Runnable {
       }
     }
     catch (IOException io) {
-      String s = "While creating ServerSocket on port " + p;
+      String s = "While creating ServerSocket and Stub on port " + p;
       throw new ConnectionException(s, io);
     }
     this.port = localPort;
   }
   
+  /**
+   * After startup we install the view ID into the conduit stub to avoid
+   * confusion during overlapping shutdown/startup from the same member.
+   * 
+   * @param viewID
+   */
+  public void setVmViewID(int viewID) {
+    this.id.setViewID(viewID);
+  }
+
+
   /** creates the server sockets.  This can be used to recreate the
    *  socket using this.port and this.bindAddress, which must be set
    *  before invoking this method.
@@ -585,7 +596,7 @@ public class TCPConduit implements Runnable {
   public void run() {
     ConnectionTable.threadWantsSharedResources();
     if (logger.isTraceEnabled(LogMarker.DM)) {
-      logger.trace(LogMarker.DM, "Starting P2P Listener on  {}", id);
+      logger.trace(LogMarker.DM, "Starting P2P Listener on  {}", this.getId());
     }
     for(;;) {
       SystemFailure.checkFailure();
@@ -720,7 +731,7 @@ public class TCPConduit implements Runnable {
     } // for
 
     if (logger.isTraceEnabled(LogMarker.DM)) {
-      logger.debug("Stopped P2P Listener on  {}", id);
+      logger.debug("Stopped P2P Listener on  {}", this.getId());
     }
   }
 
@@ -796,7 +807,7 @@ public class TCPConduit implements Runnable {
    * @since 5.1
    */
   public void getThreadOwnedOrderedConnectionState(
-    DistributedMember member,
+    Stub member,
     Map result)
   {
     getConTable().getThreadOwnedOrderedConnectionState(member, result);
@@ -808,7 +819,7 @@ public class TCPConduit implements Runnable {
    * with the key
    * @since 5.1
    */
-  public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
+  public void waitForThreadOwnedOrderedConnectionState(Stub member, Map channelState)
     throws InterruptedException
   {
     // if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState
@@ -831,12 +842,13 @@ public class TCPConduit implements Runnable {
       msg.setBytesRead(bytesRead);
       msg.setSender(receiver.getRemoteAddress());
       msg.setSharedReceiver(receiver.isSharedResource());
-      directChannel.receive(msg, bytesRead);
+      directChannel.receive(msg, bytesRead, receiver.getRemoteId());
     }
   }
 
-  /** gets the address of this conduit's ServerSocket endpoint */
-  public InetSocketAddress getId() {
+  /** gets the Stub representing this conduit's ServerSocket endpoint.  This
+      is used to generate other stubs containing endpoint information. */
+  public Stub getId() {
     return id;
   }
 
@@ -858,16 +870,21 @@ public class TCPConduit implements Runnable {
   }
 
   
-  /** gets the channel that is used to process non-DistributedMember messages */
+  /** gets the channel that is used to process non-Stub messages */
   public DirectChannel getDirectChannel() {
     return directChannel;
   }
 
+  public InternalDistributedMember getMemberForStub(Stub s, boolean validate) {
+    return membershipManager.getMemberForStub(s, validate);
+  }
+  
   public void setLocalAddr(InternalDistributedMember addr) {
     localAddr = addr;
+    this.id.setViewID(addr.getVmViewId());
   }
   
-  public InternalDistributedMember getLocalAddr() {
+  public InternalDistributedMember getLocalId() {
     return localAddr;
   }
 
@@ -877,6 +894,7 @@ public class TCPConduit implements Runnable {
    * member is in the membership view and the system is not shutting down.
    * 
    * @param memberAddress the IDS associated with the remoteId
+   * @param remoteId the TCPConduit stub for this member
    * @param preserveOrder whether this is an ordered or unordered connection
    * @param retry false if this is the first attempt
    * @param startTime the time this operation started
@@ -884,8 +902,8 @@ public class TCPConduit implements Runnable {
    * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero)
    * @return the connection
    */
-  public Connection getConnection(InternalDistributedMember memberAddress, final boolean preserveOrder, boolean retry, long startTime,
-      long ackTimeout, long ackSATimeout)
+  public Connection getConnection(InternalDistributedMember memberAddress, Stub remoteId, final boolean preserveOrder, boolean retry,
+      long startTime, long ackTimeout, long ackSATimeout)
     throws java.io.IOException, DistributedSystemDisconnectedException
   {
     //final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR);
@@ -904,7 +922,11 @@ public class TCPConduit implements Runnable {
       // problems.  Tear down the connection so that it gets
       // rebuilt.
       if (retry || conn != null) { // not first time in loop
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
+        // Consult with the membership manager; if member has gone away,
+        // there will not be an entry for this stub.
+        InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
+        if (m == null) {
+          // OK, the member left.  Just register an error.
           throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
         }
         // bug35953: Member is still in view; we MUST NOT give up!
@@ -919,14 +941,15 @@ public class TCPConduit implements Runnable {
         }
         
         // try again after sleep
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
+        m = this.membershipManager.getMemberForStub(remoteId, true);
+        if (m == null) {
           // OK, the member left.  Just register an error.
           throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
         }
         
         // Print a warning (once)
         if (memberInTrouble == null) {
-          memberInTrouble = memberAddress;
+          memberInTrouble = m;
           logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
         }
         else {
@@ -940,8 +963,8 @@ public class TCPConduit implements Runnable {
         if (conn != null) {
           try { 
             if (logger.isDebugEnabled()) {
-              logger.debug("Closing old connection.  conn={} before retrying. memberInTrouble={}",
-                  conn, memberInTrouble);
+              logger.debug("Closing old connection.  conn={} before retrying. remoteID={} memberInTrouble={}",
+                  conn, remoteId,  memberInTrouble);
             }
             conn.closeForReconnect("closing before retrying"); 
           } 
@@ -962,10 +985,10 @@ public class TCPConduit implements Runnable {
         boolean debugRetry = false;
         do {
           retryForOldConnection = false;
-          conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
+          conn = getConTable().get(remoteId, preserveOrder, startTime, ackTimeout, ackSATimeout);
           if (conn == null) {
             // conduit may be closed - otherwise an ioexception would be thrown
-            problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
+            problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(remoteId));
           } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
             if (logger.isDebugEnabled()) {
               logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
@@ -1004,14 +1027,15 @@ public class TCPConduit implements Runnable {
 
       if (problem != null) {
         // Some problems are not recoverable; check and error out early.
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
+        InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
+        if (m == null) { // left the view
           // Bracket our original warning
           if (memberInTrouble != null) {
             // make this msg info to bracket warning
             logger.info(LocalizedMessage.create(
                 LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
           }
-          throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
+          throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(remoteId));
         } // left the view
 
         if (membershipManager.shutdownInProgress()) { // shutdown in progress
@@ -1030,12 +1054,12 @@ public class TCPConduit implements Runnable {
         if (memberInTrouble == null) {
           logger.warn(LocalizedMessage.create(
           LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1,
-          new Object[] {memberAddress, problem}));
-          memberInTrouble = memberAddress;
+          new Object[] {m, problem}));
+          memberInTrouble = m;
         }
         else {
           if (logger.isDebugEnabled()) {
-            logger.debug("Error sending message to {}", memberAddress, problem);
+            logger.debug("Error sending message to {}", m, problem);
           }
         }
 
@@ -1049,7 +1073,7 @@ public class TCPConduit implements Runnable {
             throw (IOException)problem;
           }
           else {
-            IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
+            IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(remoteId));
             ioe.initCause(problem);
             throw ioe;
           }
@@ -1065,8 +1089,8 @@ public class TCPConduit implements Runnable {
             LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0,
             memberInTrouble));
         if (logger.isTraceEnabled()) {
-          logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
-        }
+          logger.trace("new connection is {} remoteId={} memberAddress={}", conn, remoteId, memberAddress);
+      }
       }
       return conn;
       }
@@ -1078,6 +1102,180 @@ public class TCPConduit implements Runnable {
     } // for(;;)
   }
 
+//   /**
+//    * Send a message.
+//    * @return the connection used to send the message
+//    * @throws IOException if peer departed view or shutdown in progress
+//    */
+//   private Connection send(Stub remoteId, ByteBuffer bb, boolean preserveOrder, DistributionMessage msg)
+//     throws java.io.IOException
+//   {
+//     if (stopped) {
+//       throw new ConduitStoppedException("The conduit is stopped");
+//     }
+
+//     if (!QUIET) {
+//       LogWriterI18n l = getLogger();
+//       if (l.finerEnabled()) {
+//         l.finer(id.toString() + " sending " + bb
+//                 + " to " + remoteId);
+//       }
+//     }
+
+//     Connection conn = null;
+//     InternalDistributedMember memberInTrouble = null;
+//     for (;;) {
+//       // If this is the second time through this loop, we had
+//       // problems.  Tear down the connection so that it gets
+//       // rebuilt.
+//       if (conn != null) { // not first time in loop
+//         // Consult with the membership manager; if member has gone away,
+//         // there will not be an entry for this stub.
+//         InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
+//         if (m == null) {
+//           // OK, the member left.  Just register an error.
+//           throw new IOException("TCP/IP connection lost and member no longer in view");
+//         }
+//         // bug35953: Member is still in view; we MUST NOT give up!
+        
+//         // Pause just a tiny bit...
+//         try {
+//           Thread.sleep(5000);
+//         }
+//         catch (InterruptedException e) {
+//           Thread.currentThread().interrupt();
+//           if (membershipManager.shutdownInProgress()) { // shutdown in progress
+//             // Bracket our original warning
+//             if (memberInTrouble != null) {
+//               logger.info("Ending retry attempt because shutdown has started.");
+//             }
+//             throw new IOException("Abandoned because shutdown is in progress");
+//           } // shutdown in progress
+          
+//           // Strange random interrupt intercepted?
+//           logger.warning("Thread has been interrupted but no shutdown in progress", e);
+//           throw new DistributedSystemDisconnectedException(e);
+//         }
+        
+//         // Print a warning (once)
+//         if (memberInTrouble == null) {
+//           memberInTrouble = m;
+//           getLogger().warning("Attempting TCP/IP reconnect to " + memberInTrouble); 
+//         }
+//         else {
+//           getLogger().fine("Attempting TCP/IP reconnect to " + memberInTrouble);
+//         }
+        
+//         // Close the connection (it will get rebuilt later).
+//         this.stats.incReconnectAttempts();
+//         try { 
+//           conn.closeForReconnect("closing before retrying"); 
+//           } 
+//         catch (CancelException ex) {
+//           // In general we ignore close problems, but if the system
+//           // is shutting down, we should just quit.
+//           throw ex;
+//         }
+//         catch (Exception ex) {
+//           }
+//       } // not first time in loop
+      
+//       // Do the send
+//       Exception problem = null;
+//       try {
+//         // Get (or regenerate) the connection
+//         // bug36202: this could generate a ConnectionException, so it
+//         // must be caught and retried
+//         conn = getConTable().get(remoteId, preserveOrder);
+//         //      getLogger().info ("connections returned " + conn);
+//         if (conn == null) {
+//           // conduit may be closed - otherwise an ioexception would be thrown
+//           throw new IOException("Unable to reconnect to server; possible shutdown: " 
+//               + remoteId);
+//         }
+
+//         conn.sendPreserialized(bb, msg);
+//       }
+//       catch (ConnectionException e) {
+//         // Race condition between acquiring the connection and attempting
+//         // to use it: another thread closed it.
+//         problem = e;
+//       }
+//       catch (IOException e) {
+//         problem = e;
+//       }
+
+//       if (problem != null) {
+//         // Some problems are not recoverable; check an error out early.
+//         InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
+//         if (m == null) { // left the view
+//           // Bracket our original warning
+//           if (memberInTrouble != null) {
+//             logger.info("Ending retry attempt because " + memberInTrouble 
+//                 + " has disappeared.");
+//           }
+//           throw new IOException("Peer has disappeared from view");
+//         } // left the view
+        
+//         if (membershipManager.shutdownInProgress()) { // shutdown in progress
+//           // Bracket our original warning
+//           if (memberInTrouble != null) {
+//             logger.info("Ending retry attempt because shutdown has started.");
+//           }
+//           throw new IOException("Abandoned because shutdown is in progress");
+//         } // shutdown in progress
+        
+//         if (endpointRemoved(remoteId)) { // endpoint removed
+//           // TODO what does this mean?
+//           // Bracket our original warning
+//           if (memberInTrouble != null) {
+//             logger.info("Ending retry attempt because " + memberInTrouble 
+//                 + " has lost its endpoint.");
+//           }
+//           throw new IOException("Endpoint was removed");
+//         } // endpoint removed
+        
+//         // Log the warning.  We wait until now, because we want
+//         // to have m defined for a nice message...
+//         if (memberInTrouble == null) {
+//           logger.warning(
+//               "Error sending message to " + m + " (will reattempt): " 
+//                   + problem.toString(), 
+//               logger.finerEnabled() ? problem : null);
+//           memberInTrouble = m;
+//         }
+//         else {
+//           logger.fine("Error sending message to " + m, problem);
+//         }
+        
+//         // Retry the operation (indefinitely)
+//         continue;
+//       } // problem != null
+//       // Success!
+      
+//       // Make sure our logging is bracketed if there was a problem
+//       if (memberInTrouble != null) {
+//         logger.info("Successfully reestablished connection to server " 
+//             + memberInTrouble);
+//       }
+//       return conn;
+//     } // while retry
+//   }
+
+//   /**
+//    * Sends an already serialized message in a byte buffer
+//    * to the given endpoint. Waits for the send to complete
+//    * before returning.
+//    * @return the connection used to send the message
+//    */
+//   public Connection sendSync(Stub remoteId, ByteBuffer bb, int processorType, DistributionMessage msg)
+//     throws java.io.IOException
+//   {
+//     return send(remoteId, bb,
+//                 processorType == DistributionManager.SERIAL_EXECUTOR,
+//                 msg);
+//   }
+
   @Override
   public String toString() {
     return "" + id;
@@ -1103,22 +1301,22 @@ public class TCPConduit implements Runnable {
     return directChannel.getDM();
   }
   /**
-   * Closes any connections used to communicate with the given member
+   * Closes any connections used to communicate with the given stub
    */
-  public void removeEndpoint(DistributedMember mbr, String reason) {
-    removeEndpoint(mbr, reason, true);
+  public void removeEndpoint(Stub stub, String reason) {
+    removeEndpoint(stub, reason, true);
   }
   
-  public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
+  public void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
     ConnectionTable ct = this.conTable;
     if (ct == null) {
       return;
     }
-    ct.removeEndpoint(mbr, reason, notifyDisconnect);
+    ct.removeEndpoint(stub, reason, notifyDisconnect);
   }
   
   /** check to see if there are still any receiver threads for the given end-point */
-  public boolean hasReceiversFor(DistributedMember endPoint) {
+  public boolean hasReceiversFor(Stub endPoint) {
     ConnectionTable ct = this.conTable;
     return (ct != null) && ct.hasReceiversFor(endPoint);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
index 773ef38..1f411bb 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.cache.RegionEvent;
 import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.Scope;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
@@ -45,10 +46,13 @@ import com.gemstone.gemfire.distributed.internal.membership.NetView;
 import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
+import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.tcp.Stub;
 
 import dunit.DistributedTestCase;
 import dunit.Host;
+import dunit.SerializableCallable;
 import dunit.SerializableRunnable;
 import dunit.VM;
 
@@ -188,7 +192,7 @@ public class DistributionManagerDUnitTest extends DistributedTestCase {
       sys.getLogWriter().info("<ExpectedException action=add>attempt to add old member</ExpectedException>");
       sys.getLogWriter().info("<ExpectedException action=add>Removing shunned GemFire node</ExpectedException>");
       try {
-        boolean accepted = mgr.addSurpriseMember(mbr);
+        boolean accepted = mgr.addSurpriseMember(mbr, new Stub());
         Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted);
       } finally {
         sys.getLogWriter().info("<ExpectedException action=remove>attempt to add old member</ExpectedException>");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
index ddbda0b..44e1b46 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java
@@ -16,18 +16,8 @@
  */
 package com.gemstone.gemfire.distributed.internal.membership.gms.mgr;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,6 +36,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.internal.AdminMessageType;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
@@ -53,6 +44,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 import com.gemstone.gemfire.distributed.internal.HighPriorityAckedMessage;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.MembershipListener;
 import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
@@ -68,11 +60,19 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager.StartupEvent;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.AdminRequest;
+import com.gemstone.gemfire.internal.admin.remote.AdminResponse;
 import com.gemstone.gemfire.internal.admin.remote.AlertListenerMessage;
 import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
 import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
+import com.gemstone.gemfire.internal.tcp.Stub;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -295,6 +295,13 @@ public class GMSMembershipManagerJUnitTest {
     suspectMember = mockMembers[1];
     manager.handleOrDeferSuspect(new SuspectMember(mockMembers[0], suspectMember, "testing"));
     verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing");
+    
+    InternalDistributedMember mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false);
+    assertTrue(mbr == null);
+    myMemberId.setDirectChannelPort(2033);
+    mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false);
+    assertTrue(mbr != null);
+    assertEquals(mbr, myMemberId);
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
index ffd5092..78c462f 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 
 import java.io.InputStream;
-import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.junit.Test;
@@ -62,7 +61,7 @@ public class ConnectionJUnitTest {
     when(stopper.cancelInProgress()).thenReturn(null);
     when(conduit.getCancelCriterion()).thenReturn(stopper);
 
-    when(conduit.getId()).thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337));
+    when(conduit.getId()).thenReturn(new Stub(SocketCreator.getLocalHost(), 10337, 1));
     
     // NIO can't be mocked because SocketChannel has a final method that
     // is used by Connection - configureBlocking