You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2017/04/05 22:31:43 UTC

[1/2] geode git commit: GEODE-2684 Connection & ConnectionTable cleanup

Repository: geode
Updated Branches:
  refs/heads/develop 39c72b204 -> 6b2b7b2f7


GEODE-2684 Connection & ConnectionTable cleanup

removed dead code and indirect access of TcpConduit through the
connection table.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6b2b7b2f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6b2b7b2f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6b2b7b2f

Branch: refs/heads/develop
Commit: 6b2b7b2f7f3f63b8ae638e9afffa5edc0f763783
Parents: 391502a
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:13:01 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/tcp/Connection.java   | 188 ++++++-------------
 .../geode/internal/tcp/ConnectionTable.java     |  68 ++-----
 .../geode/internal/tcp/DirectReplySender.java   |   2 +-
 .../apache/geode/internal/tcp/MsgReader.java    |   2 +-
 .../apache/geode/internal/tcp/NIOMsgReader.java |   2 +-
 5 files changed, 82 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a0af245..c57a0ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -72,8 +72,6 @@ public class Connection implements Runnable {
   public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
   public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
   public final static int DIRECT_ACK_BIT = 0x20;
-  // We no longer support early ack
-  // public final static int EARLY_ACK_BIT = 0x10;
 
   public static final int MSG_HEADER_SIZE_OFFSET = 0;
   public static final int MSG_HEADER_TYPE_OFFSET = 4;
@@ -95,7 +93,9 @@ public class Connection implements Runnable {
       "member unexpectedly shut down shared, unordered connection";
 
   /** the table holding this connection */
-  final ConnectionTable owner;
+  private final ConnectionTable owner;
+
+  private final TCPConduit conduit;
 
   /**
    * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
@@ -113,15 +113,6 @@ public class Connection implements Runnable {
   /** The idle timeout timer task for this connection */
   private SystemTimerTask idleTask;
 
-  /**
-   * Returns the depth of unshared reader threads from this thread to the original
-   * non-reader-thread. E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) ->
-   * reader(domino=3)
-   */
-  public static int getDominoCount() {
-    return dominoCount.get().intValue();
-  }
-
   private final static ThreadLocal isReaderThread = new ThreadLocal();
 
   public final static void makeReaderThread() {
@@ -129,7 +120,7 @@ public class Connection implements Runnable {
     makeReaderThread(true);
   }
 
-  private final static void makeReaderThread(boolean v) {
+  private static void makeReaderThread(boolean v) {
     isReaderThread.set(v);
   }
 
@@ -150,7 +141,7 @@ public class Connection implements Runnable {
     if (connectTimeoutStr != null) {
       P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
     } else {
-      P2P_CONNECT_TIMEOUT = 6 * this.owner.owner.getDM().getConfig().getMemberTimeout();
+      P2P_CONNECT_TIMEOUT = 6 * this.conduit.getDM().getConfig().getMemberTimeout();
     }
     IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
     return P2P_CONNECT_TIMEOUT;
@@ -367,20 +358,18 @@ public class Connection implements Runnable {
   /** the buffer used for NIO message receipt */
   ByteBuffer nioInputBuffer;
 
-  /** the position of the next message's content */
-  // int nioMessageStart;
-
   /** the length of the next message to be dispatched */
   int nioMessageLength;
-  // byte nioMessageVersion;
 
   /** the type of message being received */
   byte nioMessageType;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
+
   /** caches a msg destreamer that is currently not being used */
   MsgDestreamer idleMsgDestreamer;
+
   /**
    * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using
    * nio
@@ -409,8 +398,6 @@ public class Connection implements Runnable {
   private int sendBufferSize = -1;
   private int recvBufferSize = -1;
 
-  private ReplySender replySender;
-
   private void setSendBufferSize(Socket sock) {
     setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
   }
@@ -541,6 +528,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = true;
     this.owner = t;
     this.socket = socket;
@@ -628,7 +616,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
     bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
     bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
-    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
     bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
@@ -707,19 +695,16 @@ public class Connection implements Runnable {
       my_okHandshakeBytes = okHandshakeBytes;
     }
     if (useNIO()) {
+      assert my_okHandshakeBuf != null;
       synchronized (my_okHandshakeBuf) {
         my_okHandshakeBuf.position(0);
         nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
       }
     } else {
       synchronized (outLock) {
-        try {
-          // this.writerThread = Thread.currentThread();
-          this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
-          this.output.flush();
-        } finally {
-          // this.writerThread = null;
-        }
+        assert my_okHandshakeBytes != null;
+        this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
+        this.output.flush();
       }
     }
   }
@@ -832,7 +817,7 @@ public class Connection implements Runnable {
   /**
    * asynchronously close this connection
    * 
-   * @param beingSick
+   * @param beingSick test hook to simulate sickness in communications & membership
    */
   private void asyncClose(boolean beingSick) {
     // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
@@ -890,8 +875,7 @@ public class Connection implements Runnable {
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
     final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
-    // connectHandshake.reset();
-    /**
+    /*
      * Note a byte of zero is always written because old products serialized a member id with always
      * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
      * valid address would never be 0.
@@ -925,8 +909,6 @@ public class Connection implements Runnable {
   private void handshakeStream() throws IOException {
     waitForAddressCompletion();
 
-    // this.output = new BufferedOutputStream(getSocket().getOutputStream(),
-    // owner.getConduit().bufferSize);
     this.output = getSocket().getOutputStream();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
     DataOutputStream os = new DataOutputStream(baos);
@@ -961,17 +943,12 @@ public class Connection implements Runnable {
     lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
     lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
     lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
-    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
     lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
     synchronized (outLock) {
-      try {
-        // this.writerThread = Thread.currentThread();
-        this.output.write(lenbytes, 0, lenbytes.length);
-        this.output.write(msg, 0, msg.length);
-        this.output.flush();
-      } finally {
-        // this.writerThread = null;
-      }
+      this.output.write(lenbytes, 0, lenbytes.length);
+      this.output.write(msg, 0, msg.length);
+      this.output.flush();
     }
   }
 
@@ -1091,7 +1068,7 @@ public class Connection implements Runnable {
         // create connection
         try {
           conn = null;
-          conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
+          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
         } catch (javax.net.ssl.SSLHandshakeException se) {
           // no need to retry if certificates were rejected
           throw se;
@@ -1206,7 +1183,7 @@ public class Connection implements Runnable {
 
   private void setRemoteAddr(DistributedMember m) {
     this.remoteAddr = this.owner.getDM().getCanonicalId(m);
-    MembershipManager mgr = this.owner.owner.getMembershipManager();
+    MembershipManager mgr = this.conduit.getMembershipManager();
     mgr.addSurpriseMember(m);
   }
 
@@ -1214,9 +1191,8 @@ public class Connection implements Runnable {
    * creates a new connection to a remote server. We are initiating this connection; the other side
    * must accept us We will almost always send messages; small acks are received.
    */
-  private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder,
-      DistributedMember remoteID, boolean sharedResource)
-      throws IOException, DistributedSystemDisconnectedException {
+  private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
+      boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
 
     // initialize a socket upfront. So that the
     InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
@@ -1224,6 +1200,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = false;
     this.owner = t;
     this.sharedResource = sharedResource;
@@ -1248,7 +1225,7 @@ public class Connection implements Runnable {
 
         channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-        /**
+        /*
          * If conserve-sockets is false, the socket can be used for receiving responses, so set the
          * receive buffer accordingly.
          */
@@ -1261,7 +1238,7 @@ public class Connection implements Runnable {
         setSendBufferSize(channel.socket());
         channel.configureBlocking(true);
 
-        int connectTime = getP2PConnectTimeout();;
+        int connectTime = getP2PConnectTimeout();
 
         try {
           channel.socket().connect(addr, connectTime);
@@ -1276,7 +1253,7 @@ public class Connection implements Runnable {
             Thread.currentThread().interrupt();
           }
           throw c;
-        } catch (CancelledKeyException e) {
+        } catch (CancelledKeyException | ClosedSelectorException e) {
           // bug #44469: for some reason NIO throws this runtime exception
           // instead of an IOException on timeouts
           ConnectException c = new ConnectException(
@@ -1284,14 +1261,6 @@ public class Connection implements Runnable {
                   .toLocalizedString(new Object[] {connectTime}));
           c.initCause(e);
           throw c;
-        } catch (ClosedSelectorException e) {
-          // bug #44808: for some reason JRockit NIO thorws this runtime exception
-          // instead of an IOException on timeouts
-          ConnectException c = new ConnectException(
-              LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
-                  .toLocalizedString(new Object[] {connectTime}));
-          c.initCause(e);
-          throw c;
         }
       } finally {
         this.owner.removeConnectingSocket(channel.socket());
@@ -1309,7 +1278,6 @@ public class Connection implements Runnable {
         setSocketBufferSize(this.socket, false, socketBufferSize, true);
         setSendBufferSize(this.socket);
       } else {
-        // socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
         Socket s = new Socket();
         this.socket = s;
         s.setTcpNoDelay(true);
@@ -1335,13 +1303,12 @@ public class Connection implements Runnable {
    * must not be doing it correctly.
    */
   private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
-  protected static final int BATCH_BUFFER_SIZE =
+  private static final int BATCH_BUFFER_SIZE =
       Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
-  protected static final int BATCH_FLUSH_MS =
-      Integer.getInteger("p2p.batchFlushTime", 50).intValue();
-  protected Object batchLock;
-  protected ByteBuffer fillBatchBuffer;
-  protected ByteBuffer sendBatchBuffer;
+  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
+  private Object batchLock;
+  private ByteBuffer fillBatchBuffer;
+  private ByteBuffer sendBatchBuffer;
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
@@ -1446,13 +1413,7 @@ public class Connection implements Runnable {
                   SocketChannel channel = getSocket().getChannel();
                   nioWriteFully(channel, sendBatchBuffer, false, null);
                   sendBatchBuffer.clear();
-                } catch (IOException ex) {
-                  logger.fatal(LocalizedMessage.create(
-                      LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
-                  readerShuttingDown = true;
-                  requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0
-                      .toLocalizedString(ex));
-                } catch (ConnectionException ex) {
+                } catch (IOException | ConnectionException ex) {
                   logger.fatal(LocalizedMessage.create(
                       LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
                   readerShuttingDown = true;
@@ -1526,13 +1487,6 @@ public class Connection implements Runnable {
     return this.closing.get();
   }
 
-  /**
-   * Used to close a connection that has not yet been registered with the distribution manager.
-   */
-  void closePartialConnect(String reason) {
-    close(reason, false, false, false, false);
-  }
-
   void closePartialConnect(String reason, boolean beingSick) {
     close(reason, false, false, beingSick, false);
   }
@@ -1619,9 +1573,9 @@ public class Connection implements Runnable {
       // if network partition detection is enabled or this is an admin vm
       // we can't wait for the reader thread when running in an IBM JRE. See
       // bug 41889
-      if (this.owner.owner.config.getEnableNetworkPartitionDetection()
-          || this.owner.owner.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
-          || this.owner.owner.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+      if (this.conduit.config.getEnableNetworkPartitionDetection()
+          || this.conduit.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
+          || this.conduit.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
         isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
       }
       {
@@ -1673,6 +1627,7 @@ public class Connection implements Runnable {
               }
             }
           } else {
+            // noinspection ConstantConditions
             this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
           }
         } else if (!this.isReceiver) {
@@ -1735,7 +1690,7 @@ public class Connection implements Runnable {
       initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
         if (!this.sharedResource) {
-          this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+          this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
@@ -1759,7 +1714,7 @@ public class Connection implements Runnable {
   }
 
   private String p2pReaderName() {
-    StringBuffer sb = new StringBuffer(64);
+    StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
       sb.append("P2P message reader@");
     } else {
@@ -1973,8 +1928,8 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return (msg.indexOf("forcibly closed") >= 0) || (msg.indexOf("reset by peer") >= 0)
-        || (msg.indexOf("connection reset") >= 0);
+    return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
+        || (msg.contains("connection reset"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2012,7 +1967,7 @@ public class Connection implements Runnable {
           this.idleMsgDestreamer = null;
         } else {
           result = new MsgDestreamer(this.owner.getConduit().stats,
-              this.owner.owner.getCancelCriterion(), v);
+              this.conduit.getCancelCriterion(), v);
         }
         result.setName(p2pReaderName() + " msgId=" + msgId);
         this.destreamerMap.put(key, result);
@@ -2103,7 +2058,7 @@ public class Connection implements Runnable {
         /* byte msgHdrVersion = */ calcHdrVersion(len);
         len = calcMsgByteSize(len);
         int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) ((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff * 0x100)
+        short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
             + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
         boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
         if (myDirectAck) {
@@ -2384,7 +2339,7 @@ public class Connection implements Runnable {
                   // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
                   // will prefer shared sockets");
                 }
-                this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
               }
 
               if (logger.isDebugEnabled()) {
@@ -2541,11 +2496,6 @@ public class Connection implements Runnable {
         }
         bytesSoFar += bytesThisTime;
       } catch (InterruptedIOException io) {
-        // try { Thread.sleep(10); }
-        // catch (InterruptedException ie) {
-        // Thread.currentThread().interrupt();
-        // }
-
         // Current thread has been interrupted. Regard it similar to an EOF
         this.readerShuttingDown = true;
         try {
@@ -2582,7 +2532,7 @@ public class Connection implements Runnable {
     final boolean origSocketInUse = this.socketInUse;
     byte originalState = -1;
     synchronized (stateLock) {
-      originalState = this.connectionState;;
+      originalState = this.connectionState;
       this.connectionState = STATE_SENDING;
     }
     this.socketInUse = true;
@@ -2597,13 +2547,8 @@ public class Connection implements Runnable {
         } else {
           byte[] bytesToWrite = getBytesToWrite(buffer);
           synchronized (outLock) {
-            try {
-              // this.writerThread = Thread.currentThread();
-              this.output.write(bytesToWrite);
-              this.output.flush();
-            } finally {
-              // this.writerThread = null;
-            }
+            this.output.write(bytesToWrite);
+            this.output.flush();
           }
         }
       }
@@ -2763,15 +2708,7 @@ public class Connection implements Runnable {
     return bytesToWrite;
   }
 
-  // private String socketInfo() {
-  // return (" socket: " + getSocket().getLocalAddress() + ":" + getSocket().getLocalPort() + " -> "
-  // +
-  // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection = " +
-  // System.identityHashCode(this));
-  //
-  // }
-
-  private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
+  private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
       throws ConnectionException {
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -2891,7 +2828,7 @@ public class Connection implements Runnable {
    * 
    * @throws ConnectionException if the conduit has stopped
    */
-  private final boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
+  private boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
       throws ConnectionException {
     if (!addToQueue(buffer, msg, true)) {
       return false;
@@ -2931,7 +2868,7 @@ public class Connection implements Runnable {
     this.pusherThread.start();
   }
 
-  private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
+  private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
     ByteBuffer result = null;
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -3152,7 +3089,7 @@ public class Connection implements Runnable {
    * Return false if socket writes to be done async/nonblocking Return true if socket writes to be
    * done sync/blocking
    */
-  private final boolean useSyncWrites(boolean forceAsync) {
+  private boolean useSyncWrites(boolean forceAsync) {
     if (forceAsync) {
       return false;
     }
@@ -3185,7 +3122,7 @@ public class Connection implements Runnable {
 
   static private final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2)
 
-  private final void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
+  private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage p_msg, final DMStats stats) throws IOException {
     DistributionMessage msg = p_msg;
     // async/non-blocking
@@ -3301,7 +3238,7 @@ public class Connection implements Runnable {
                 if (msToWait <= 0) {
                   Thread.yield();
                 } else {
-                  boolean interrupted = Thread.interrupted();;
+                  boolean interrupted = Thread.interrupted();
                   try {
                     Thread.sleep(msToWait);
                   } catch (InterruptedException ex) {
@@ -3401,10 +3338,10 @@ public class Connection implements Runnable {
   /**
    * stateLock is used to synchronize state changes.
    */
-  protected Object stateLock = new Object();
+  private final Object stateLock = new Object();
 
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState = STATE_IDLE;
+  private byte connectionState = STATE_IDLE;
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
@@ -3420,16 +3357,11 @@ public class Connection implements Runnable {
   /** the connection is in use and is reading a message */
   protected static final byte STATE_READING = 5;
 
-  protected static final String[] STATE_NAMES =
-      new String[] {"idle", "sending", "post_sending", "reading_ack", "received_ack", "reading"};
   /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 
   /** set to true if we exceeded the ack-wait-threshold waiting for a response */
   protected volatile boolean ackTimedOut;
 
-  private static int ACK_SIZE = 1;
-  private static byte ACK_BYTE = 37;
-
   /**
    * @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever.
    * @param msInterval interval between checks
@@ -3899,7 +3831,7 @@ public class Connection implements Runnable {
                     // } else {
                     // ConnectionTable.threadWantsSharedResources();
                   }
-                  this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                  this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
                   // Because this thread is not shared resource, it will be used for direct
                   // ack. Direct ack messages can be large. This call will resize the send
                   // buffer.
@@ -4039,6 +3971,10 @@ public class Connection implements Runnable {
     }
   }
 
+  protected TCPConduit getConduit() {
+    return this.conduit;
+  }
+
   protected Socket getSocket() throws SocketException {
     // fix for bug 37286
     Socket result = this.socket;
@@ -4177,7 +4113,7 @@ public class Connection implements Runnable {
   boolean nioChecked;
   boolean useNIO;
 
-  private final boolean useNIO() {
+  private boolean useNIO() {
     if (TCPConduit.useSSL) {
       return false;
     }
@@ -4193,7 +4129,7 @@ public class Connection implements Runnable {
     if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) {
       String os = System.getProperty("os.name");
       if (os != null) {
-        if (os.indexOf("Windows") != -1) {
+        if (os.contains("Windows")) {
           this.useNIO = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 08a9009..c55af82 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -64,17 +64,9 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
  * 
  * @since GemFire 2.1
  */
-/*
- * Note: We no longer use InputMultiplexer If InputMux is reinstated then the manager needs to be
- * initialized and all lines that have a NOMUX preface should be uncommented
- * 
- */
 public class ConnectionTable {
   private static final Logger logger = LogService.getLogger();
 
-  /** a random number generator for secondary connection selection */
-  // static java.util.Random random = new java.util.Random();
-
   /** warning when descriptor limit reached */
   private static boolean ulimitWarningIssued;
 
@@ -82,6 +74,7 @@ public class ConnectionTable {
    * true if the current thread wants non-shared resources
    */
   private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+
   /**
    * Used for messages whose order must be preserved Only connections used for sending messages, and
    * receiving acks, will be put in this map.
@@ -132,9 +125,7 @@ public class ConnectionTable {
   /**
    * the conduit for this table
    */
-  protected final TCPConduit owner;
-  // ARB: temp making this protected to provide access to Connection.
-  // private final TCPConduit owner;
+  private final TCPConduit owner;
 
   /**
    * true if this table is no longer in use
@@ -205,17 +196,10 @@ public class ConnectionTable {
     return (Boolean) threadWantsOwnResources.get();
   }
 
-  // public static void setThreadOwnsResourcesRegistration(
-  // Boolean newValue) {
-  // threadWantsOwnResources.set(newValue);
-  // }
-  // private Map connections = new HashMap();
-  /* NOMUX: private InputMuxManager inputMuxManager; */
-  // private int lowWater;
-  // private int highWater;
+  public TCPConduit getOwner() {
+    return owner;
+  }
 
-  // private static boolean TRACK_SERVER_CONNECTIONS =
-  // System.getProperty("p2p.bidirectional", "true").equals("true");
 
   private ConnectionTable(TCPConduit c) throws IOException {
     this.owner = c;
@@ -226,10 +210,6 @@ public class ConnectionTable {
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
-    /*
-     * NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new InputMuxManager(this);
-     * inputMuxManager.start(c.logger); }
-     */
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -306,7 +286,6 @@ public class ConnectionTable {
       }
     }
 
-    // Stub id = conn.getRemoteId();
     if (conn != null) {
       synchronized (this.receivers) {
         this.owner.stats.incReceivers();
@@ -322,22 +301,9 @@ public class ConnectionTable {
             conn.remoteAddr);
       }
     }
-    // cleanupHighWater();
   }
 
 
-  // /** returns the connection associated with the given key, or null if
-  // no such connection exists */
-  // protected Connection basicGet(Serializable id) {
-  // synchronized (this.orderedConnectionMap) {
-  // return (Connection) this.orderedConnectionMap.get(id);
-  // }
-  // }
-
-  // protected Connection get(Serializable id) throws java.io.IOException {
-  // return get(id, false);
-  // }
-
 
   /**
    * Process a newly created PendingConnection
@@ -432,10 +398,10 @@ public class ConnectionTable {
   }
 
   /**
-   * unordered or conserve-sockets note that unordered connections are currently always shared
+   * unordered or conserve-sockets=true note that unordered connections are currently always shared
    * 
    * @param id the DistributedMember on which we are creating a connection
-   * @param threadOwnsResources whether unordered conn is owned by the current thread
+   * @param scheduleTimeout whether unordered connection should time out
    * @param preserveOrder whether to preserve order
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
@@ -444,9 +410,9 @@ public class ConnectionTable {
    * @throws IOException if unable to create the connection
    * @throws DistributedSystemDisconnectedException
    */
-  private Connection getUnorderedOrConserveSockets(DistributedMember id,
-      boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout,
-      long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+  private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
+      boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+      throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
     final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
@@ -472,7 +438,7 @@ public class ConnectionTable {
     if (pc != null) {
       result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
           startTime, ackTimeout, ackSATimeout);
-      if (!preserveOrder && threadOwnsResources) {
+      if (!preserveOrder && scheduleTimeout) {
         scheduleIdleTimeout(result);
       }
     } else { // we have existing connection
@@ -487,10 +453,10 @@ public class ConnectionTable {
             startTime, ackTimeout, ackSATimeout);
         if (logger.isDebugEnabled()) {
           if (result != null) {
-            logger.debug("getUnorderedOrConserveSockets {} myAddr={} theirAddr={}", result,
+            logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
                 getConduit().getMemberId(), result.remoteAddr);
           } else {
-            logger.debug("getUnorderedOrConserveSockets: Connect failed");
+            logger.debug("getSharedConnection: Connect failed");
           }
         }
       } else {
@@ -512,7 +478,7 @@ public class ConnectionTable {
    * @throws IOException if the connection could not be created
    * @throws DistributedSystemDisconnectedException
    */
-  Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout,
+  Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
       long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
@@ -658,10 +624,10 @@ public class ConnectionTable {
     Connection result = null;
     boolean threadOwnsResources = threadOwnsResources();
     if (!preserveOrder || !threadOwnsResources) {
-      result = getUnorderedOrConserveSockets(id, threadOwnsResources, preserveOrder, startTime,
-          ackTimeout, ackSATimeout);
+      result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
+          ackSATimeout);
     } else {
-      result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout);
+      result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
     }
     if (result != null) {
       Assert.assertTrue(result.preserveOrder == preserveOrder);

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 3872ee9..bf06953 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -56,7 +56,7 @@ class DirectReplySender implements ReplySender {
     // mutates the list when it has exceptions.
 
     // fix for bug #42199 - cancellation check
-    this.conn.owner.getDM().getCancelCriterion().checkCancelInProgress(null);
+    this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
 
     if (logger.isTraceEnabled(LogMarker.DM)) {
       logger.trace(LogMarker.DM, "Sending a direct reply {} to {}", msg, conn.getRemoteAddress());

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index fc56271..be1f533 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
   public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
 
   protected DMStats getStats() {
-    return conn.owner.getConduit().stats;
+    return conn.getConduit().stats;
   }
 
   public static class Header {

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
index 50f5fae..a4e35a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
@@ -79,7 +79,7 @@ public class NIOMsgReader extends MsgReader {
     if (nioInputBuffer == null) {
       int allocSize = conn.getReceiveBufferSize();
       if (allocSize == -1) {
-        allocSize = conn.owner.getConduit().tcpBufferSize;
+        allocSize = conn.getConduit().tcpBufferSize;
       }
       if (allocSize > bufferSize) {
         bufferSize = allocSize;


[2/2] geode git commit: GEODE-2732 after auto-reconnect a server is restarted on the default port

Posted by bs...@apache.org.
GEODE-2732 after auto-reconnect a server is restarted on the default port

Gfsh command line parameters were put into ThreadLocals to make them
available to the XML parser.  These are now held in non-thread-local
variables so that all threads, including the auto-reconnect thread,
can see them when building the cache.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/391502a2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/391502a2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/391502a2

Branch: refs/heads/develop
Commit: 391502a2615dbc80cde0b9a111fa967f4d76c39a
Parents: 39c72b2
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:11:04 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../geode/distributed/ServerLauncher.java       |  10 +-
 .../internal/cache/CacheServerLauncher.java     |  43 +++++---
 .../internal/cache/xmlcache/CacheCreation.java  |   2 +-
 .../cache30/ReconnectWithCacheXMLDUnitTest.java | 107 +++++++++++++++++++
 .../ReconnectWithUDPSecurityDUnitTest.java      |   6 ++
 .../ReconnectedCacheServerDUnitTest.java        |   4 +-
 .../cache30/ReconnectWithCacheXMLDUnitTest.xml  |  25 +++++
 7 files changed, 176 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9435bd8..c96732c 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -274,7 +274,7 @@ public class ServerLauncher extends AbstractLauncher<String> {
     this.assignBuckets = Boolean.TRUE.equals(builder.getAssignBuckets());
     setDebug(Boolean.TRUE.equals(builder.getDebug()));
     this.disableDefaultServer = Boolean.TRUE.equals(builder.getDisableDefaultServer());
-    CacheServerLauncher.disableDefaultServer.set(this.disableDefaultServer);
+    CacheServerLauncher.setDisableDefaultServer(this.disableDefaultServer);
     this.distributedSystemProperties = builder.getDistributedSystemProperties();
     this.force = Boolean.TRUE.equals(builder.getForce());
     this.help = Boolean.TRUE.equals(builder.getHelp());
@@ -286,11 +286,11 @@ public class ServerLauncher extends AbstractLauncher<String> {
     this.redirectOutput = Boolean.TRUE.equals(builder.getRedirectOutput());
     this.serverBindAddress = builder.getServerBindAddress();
     if (builder.isServerBindAddressSetByUser() && this.serverBindAddress != null) {
-      CacheServerLauncher.serverBindAddress.set(this.serverBindAddress.getHostAddress());
+      CacheServerLauncher.setServerBindAddress(this.serverBindAddress.getHostAddress());
     }
     this.serverPort = builder.getServerPort();
     if (builder.isServerPortSetByUser() && this.serverPort != null) {
-      CacheServerLauncher.serverPort.set(this.serverPort);
+      CacheServerLauncher.setServerPort(this.serverPort);
     }
     this.springXmlLocation = builder.getSpringXmlLocation();
     this.workingDirectory = builder.getWorkingDirectory();
@@ -954,8 +954,8 @@ public class ServerLauncher extends AbstractLauncher<String> {
       final String serverBindAddress =
           (getServerBindAddress() == null ? null : getServerBindAddress().getHostAddress());
       final Integer serverPort = getServerPort();
-      CacheServerLauncher.serverBindAddress.set(serverBindAddress);
-      CacheServerLauncher.serverPort.set(serverPort);
+      CacheServerLauncher.setServerBindAddress(serverBindAddress);
+      CacheServerLauncher.setServerPort(serverPort);
       final CacheServer cacheServer = cache.addCacheServer();
       cacheServer.setBindAddress(serverBindAddress);
       cacheServer.setPort(serverPort);

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
index 760abd3..9a544d2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
@@ -565,28 +565,43 @@ public class CacheServerLauncher {
     }
   }
 
-  public static ThreadLocal<Integer> serverPort = new ThreadLocal<Integer>();
+  private static Integer serverPort;
+
+  private static String serverBindAddress;
+
+  public static void setServerPort(Integer serverPort) {
+    CacheServerLauncher.serverPort = serverPort;
+  }
+
+  public static void setServerBindAddress(String serverBindAddress) {
+    CacheServerLauncher.serverBindAddress = serverBindAddress;
+  }
+
+  public static void setDisableDefaultServer(Boolean disableDefaultServer) {
+    CacheServerLauncher.disableDefaultServer = disableDefaultServer;
+  }
+
+  public static Boolean disableDefaultServer;
+
 
-  public static ThreadLocal<String> serverBindAddress = new ThreadLocal<String>();
 
   public static Integer getServerPort() {
-    return serverPort.get();
+    return serverPort;
   }
 
   public static String getServerBindAddress() {
-    return serverBindAddress.get();
+    return serverBindAddress;
   }
 
-  public static ThreadLocal<Boolean> disableDefaultServer = new ThreadLocal<Boolean>();
-
   public static Boolean getDisableDefaultServer() {
-    return disableDefaultServer.get();
+    return disableDefaultServer;
   }
 
+
   public static void clearStatics() {
-    disableDefaultServer.set(null);
-    serverPort.set(null);
-    serverBindAddress.set(null);
+    disableDefaultServer = null;
+    serverPort = null;
+    serverBindAddress = null;
   }
 
 
@@ -616,11 +631,11 @@ public class CacheServerLauncher {
     final String serverPortString = (String) options.get(SERVER_PORT);
 
     if (serverPortString != null) {
-      serverPort.set(Integer.parseInt(serverPortString));
+      serverPort = Integer.parseInt(serverPortString);
     }
 
-    serverBindAddress.set((String) options.get(SERVER_BIND_ADDRESS_NAME));
-    disableDefaultServer.set((Boolean) options.get(DISABLE_DEFAULT_SERVER));
+    serverBindAddress = (String) options.get(SERVER_BIND_ADDRESS_NAME);
+    disableDefaultServer = (Boolean) options.get(DISABLE_DEFAULT_SERVER);
     workingDir = new File(System.getProperty("user.dir"));
 
     // Say that we're starting...
@@ -835,7 +850,7 @@ public class CacheServerLauncher {
     // Create and start a default cache server
     // If (disableDefaultServer is not set or it is set but false) AND (the number of cacheservers
     // is 0)
-    Boolean disable = disableDefaultServer.get();
+    Boolean disable = disableDefaultServer;
     if ((disable == null || !disable) && cache.getCacheServers().size() == 0) {
       // Create and add a cache server
       CacheServer server = cache.addCacheServer();

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 1c3c933..a0810d9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -503,7 +503,7 @@ public class CacheCreation implements InternalCache {
 
     Integer serverPort = CacheServerLauncher.getServerPort();
     String serverBindAdd = CacheServerLauncher.getServerBindAddress();
-    Boolean disableDefaultServer = CacheServerLauncher.disableDefaultServer.get();
+    Boolean disableDefaultServer = CacheServerLauncher.getDisableDefaultServer();
     startCacheServers(this.getCacheServers(), cache, serverPort, serverBindAdd,
         disableDefaultServer);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
new file mode 100755
index 0000000..4f2fac1
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.membership.MembershipTestHook;
+import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerLauncher;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.util.test.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
+public class ReconnectWithCacheXMLDUnitTest extends JUnit4CacheTestCase {
+
+
+  public ReconnectWithCacheXMLDUnitTest() {
+    super();
+  }
+
+  private static final long serialVersionUID = 1L;
+
+
+  private String xmlProperty = DistributionConfig.GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile";
+  private String oldPropertySetting;
+
+  @Override
+  public final void postSetUp() {
+    oldPropertySetting = System.setProperty(xmlProperty, "true");
+  }
+
+  @Override
+  public final void preTearDownCacheTestCase() throws Exception {
+    if (oldPropertySetting == null) {
+      System.getProperties().remove(xmlProperty);
+    } else {
+      System.setProperty(xmlProperty, oldPropertySetting);
+    }
+  }
+
+  @Override
+  public Properties getDistributedSystemProperties() {
+    Properties result = super.getDistributedSystemProperties();
+    String fileName = TestUtil.getResourcePath(getClass(), "ReconnectWithCacheXMLDUnitTest.xml");
+    result.put(ConfigurationProperties.CACHE_XML_FILE, fileName);
+    result.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "true");
+    result.put(ConfigurationProperties.DISABLE_AUTO_RECONNECT, "false");
+    result.put(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "2000");
+    return result;
+  }
+
+  @Test
+  public void testCacheServerLauncherPortRetained() throws Exception {
+    CacheServerLauncher.setDisableDefaultServer(true);
+    CacheServerLauncher.setServerPort(AvailablePortHelper.getRandomAvailableTCPPort());
+    Cache cache = getCache();
+
+    final AtomicBoolean membershipFailed = new AtomicBoolean();
+    MembershipManagerHelper.addTestHook(cache.getDistributedSystem(), new MembershipTestHook() {
+      @Override
+      public void beforeMembershipFailure(String reason, Throwable cause) {
+        membershipFailed.set(true);
+      }
+
+      @Override
+      public void afterMembershipFailure(String reason, Throwable cause) {}
+    });
+    MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
+    assertTrue(membershipFailed.get());
+
+    await().atMost(60, TimeUnit.SECONDS).until(() -> cache.getReconnectedCache() != null);
+
+    Cache newCache = cache.getReconnectedCache();
+    CacheServer server = newCache.getCacheServers().iterator().next();
+    assertEquals(CacheServerLauncher.getServerPort().intValue(), server.getPort());
+    assertEquals(20, server.getMaxConnections()); // this setting is in the XML file
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
index a52d8bf..55d0a3c 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
@@ -17,8 +17,14 @@ package org.apache.geode.cache30;
 import java.util.Properties;
 
 import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.junit.experimental.categories.Category;
+
 import static org.apache.geode.distributed.ConfigurationProperties.*;
 
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
 public class ReconnectWithUDPSecurityDUnitTest extends ReconnectDUnitTest {
 
   public ReconnectWithUDPSecurityDUnitTest() {

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
index 3224257..2a2fe73 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache30;
 
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
@@ -32,7 +34,7 @@ import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipMan
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 
 
-@Category(DistributedTest.class)
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
 public class ReconnectedCacheServerDUnitTest extends JUnit4CacheTestCase {
 
   public ReconnectedCacheServerDUnitTest() {

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
new file mode 100644
index 0000000..f7e338b
--- /dev/null
+++ b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cache
+    xmlns="http://geode.apache.org/schema/cache"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+    version="1.0">
+  <cache-server max-connections="20"/>
+</cache>
+