You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/12/07 00:21:13 UTC

[geode] 01/02: GEODE-7256: Cleanup Connection classes and tests

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-7256-AlterRuntimeCommandDUnitTest
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 818d4ffb57bf742b08016f039516fc8076f53af4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Dec 6 12:52:34 2019 -0800

    GEODE-7256: Cleanup Connection classes and tests
    
    * Minor cleanup of classes and tests
---
 .../internal/tcp/ConnectionIntegrationTest.java    |   61 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 2060 +++++++++-----------
 .../apache/geode/internal/tcp/ConnectionTable.java |  685 +++----
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  375 ++--
 .../geode/internal/tcp/ConnectionJUnitTest.java    |  108 -
 .../geode/internal/tcp/ConnectionTableTest.java    |   22 +-
 .../apache/geode/internal/tcp/ConnectionTest.java  |   74 +-
 7 files changed, 1545 insertions(+), 1840 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
index 1a0db82..3413eec 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
@@ -14,6 +14,10 @@
  */
 package org.apache.geode.internal.tcp;
 
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.test.assertj.LogFileAssert.assertThat;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.quality.Strictness.STRICT_STUBS;
@@ -26,6 +30,8 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.Properties;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -34,20 +40,23 @@ import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
 import org.apache.geode.cache.Cache;
-import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.assertj.LogFileAssert;
-import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.rules.CacheRule;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
 public class ConnectionIntegrationTest {
 
+  private static final String EXPECTED_EXCEPTION_MESSAGE =
+      "Unknown handshake reply code: 99 messageLength: 0";
+
+  private File logFile;
+  private Cache cache;
+
   @Rule
-  public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -55,36 +64,44 @@ public class ConnectionIntegrationTest {
   @Rule
   public CacheRule cacheRule = new CacheRule();
 
-  @Test
-  public void badHeaderMessageIsCorrectlyLogged() throws Exception {
+  @Before
+  public void setUp() throws Exception {
+    logFile = temporaryFolder.newFile();
+
     Properties properties = new Properties();
-    properties.put(ConfigurationProperties.LOCATORS, ""); // loner system
-    File logFile = temporaryFolder.newFile();
-    properties.put(ConfigurationProperties.LOG_FILE, logFile.getAbsolutePath());
-    cacheRule.createCache(properties);
-    Cache cache = cacheRule.getCache();
+    properties.setProperty(LOCATORS, "");
+    properties.setProperty(LOG_FILE, logFile.getAbsolutePath());
 
-    final String expectedException = "Unknown handshake reply code: 99 messageLength: 0";
+    cache = cacheRule.getOrCreateCache(properties);
 
-    IgnoredException.addIgnoredException(expectedException);
+    addIgnoredException(EXPECTED_EXCEPTION_MESSAGE);
+  }
 
+  @After
+  public void tearDown() {
+    cache.close();
+  }
+
+  @Test
+  public void badHeaderMessageIsCorrectlyLogged() {
     ConnectionTable connectionTable = mock(ConnectionTable.class);
+    DistributionConfig config = mock(DistributionConfig.class);
+    Socket socket = mock(Socket.class);
     TCPConduit tcpConduit = mock(TCPConduit.class);
+
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
     when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress("localhost", 1234));
-    DistributionConfig config = mock(DistributionConfig.class);
     when(config.getEnableNetworkPartitionDetection()).thenReturn(false);
     when(tcpConduit.getConfig()).thenReturn(config);
     when(tcpConduit.getMemberId()).thenReturn(new InternalDistributedMember("localhost", 2345));
     when(connectionTable.getSocketCloser()).thenReturn(mock(SocketCloser.class));
-    Socket socket = mock(Socket.class);
+
     Connection connection = new Connection(connectionTable, socket);
-    ByteBuffer peerDataBuffer = ByteBuffer.allocate(100);
-    byte[] bytes = new byte[] {99};
-    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(new byte[] {99});
     DataInputStream inputStream = new DataInputStream(byteArrayInputStream);
-    connection.readHandshakeForSender(inputStream, peerDataBuffer);
-    cache.close();
-    LogFileAssert.assertThat(logFile).contains(expectedException);
+
+    connection.readHandshakeForSender(inputStream, ByteBuffer.allocate(100));
+
+    assertThat(logFile).contains(EXPECTED_EXCEPTION_MESSAGE);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 5c95d0d..d813b69 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -14,8 +14,13 @@
  */
 package org.apache.geode.internal.tcp;
 
+import static java.lang.Boolean.FALSE;
+import static java.lang.ThreadLocal.withInitial;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.ConnectException;
@@ -30,7 +35,6 @@ import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SocketChannel;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
 
 import org.apache.logging.log4j.Logger;
 
@@ -47,6 +52,7 @@ import org.apache.geode.CancelException;
 import org.apache.geode.SerializationException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.annotations.internal.MutableForTesting;
 import org.apache.geode.cache.CacheClosedException;
@@ -57,7 +63,6 @@ import org.apache.geode.distributed.internal.ConflationKey;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionStats;
@@ -69,7 +74,6 @@ import org.apache.geode.distributed.internal.ReplySender;
 import org.apache.geode.distributed.internal.direct.DirectChannel;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.gms.api.Membership;
-import org.apache.geode.distributed.internal.membership.gms.api.MembershipStatistics;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -93,12 +97,13 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  */
 public class Connection implements Runnable {
   private static final Logger logger = LogService.getLogger();
+
   public static final String THREAD_KIND_IDENTIFIER = "P2P message reader";
 
   @MakeNotStatic
   private static int P2P_CONNECT_TIMEOUT;
   @MakeNotStatic
-  private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false;
+  private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED;
 
   static final int NORMAL_MSG_TYPE = 0x4c;
   static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
@@ -114,18 +119,25 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  public static final int SMALL_BUFFER_SIZE =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
+  static final int SMALL_BUFFER_SIZE =
+      Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
-  /** counter to give connections a unique id */
+  /**
+   * counter to give connections a unique id
+   */
   @MakeNotStatic
-  private static final AtomicLong idCounter = new AtomicLong(1);
+  private static final AtomicLong ID_COUNTER = new AtomicLong(1);
 
-  /** string used as the reason for initiating suspect processing */
+  /**
+   * string used as the reason for initiating suspect processing
+   */
+  @VisibleForTesting
   public static final String INITIATING_SUSPECT_PROCESSING =
       "member unexpectedly shut down shared, unordered connection";
 
-  /** the table holding this connection */
+  /**
+   * the table holding this connection
+   */
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
@@ -135,54 +147,19 @@ public class Connection implements Runnable {
    * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
    * thread may be a pooled thread.
    */
-  private volatile boolean isRunning = false;
+  private volatile boolean isRunning;
 
-  /** true if connection is a shared resource that can be used by more than one thread */
+  /**
+   * true if connection is a shared resource that can be used by more than one thread
+   */
   private boolean sharedResource;
 
-  public boolean isSharedResource() {
-    return this.sharedResource;
-  }
-
-  /** The idle timeout timer task for this connection */
+  /**
+   * The idle timeout timer task for this connection
+   */
   private SystemTimerTask idleTask;
 
-  private static final ThreadLocal<Boolean> isReaderThread = new ThreadLocal<Boolean>() {
-    @Override
-    public Boolean initialValue() {
-      return Boolean.FALSE;
-    }
-  };
-
-  public static void makeReaderThread() {
-    // mark this thread as a reader thread
-    makeReaderThread(true);
-  }
-
-  private static void makeReaderThread(boolean v) {
-    isReaderThread.set(v);
-  }
-
-  // return true if this thread is a reader thread
-  private static boolean isReaderThread() {
-    return isReaderThread.get();
-  }
-
-  int getP2PConnectTimeout(DistributionConfig config) {
-    if (AlertingAction.isThreadAlerting()) {
-      return config.getMemberTimeout();
-    }
-    if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
-      return P2P_CONNECT_TIMEOUT;
-    String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
-    if (connectTimeoutStr != null) {
-      P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
-    } else {
-      P2P_CONNECT_TIMEOUT = 6 * config.getMemberTimeout();
-    }
-    IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
-    return P2P_CONNECT_TIMEOUT;
-  }
+  private static final ThreadLocal<Boolean> isReaderThread = withInitial(() -> FALSE);
 
   /**
    * If true then readers for thread owned sockets will send all messages on thread owned senders.
@@ -191,40 +168,24 @@ public class Connection implements Runnable {
   private static final boolean DOMINO_THREAD_OWNED_SOCKETS =
       Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
 
-  private static final ThreadLocal<Boolean> isDominoThread = new ThreadLocal<Boolean>() {
-    @Override
-    public Boolean initialValue() {
-      return Boolean.FALSE;
-    }
-  };
+  private static final ThreadLocal<Boolean> isDominoThread = withInitial(() -> FALSE);
 
-  // return true if this thread is a reader thread
-  private static boolean tipDomino() {
-    if (DOMINO_THREAD_OWNED_SOCKETS) {
-      // mark this thread as one who wants to send ALL on TO sockets
-      ConnectionTable.threadWantsOwnResources();
-      isDominoThread.set(Boolean.TRUE);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public static boolean isDominoThread() {
-    return isDominoThread.get();
-  }
-
-  /** the socket entrusted to this connection */
+  /**
+   * the socket entrusted to this connection
+   */
   private final Socket socket;
 
-  /** output stream/channel lock */
+  /**
+   * output stream/channel lock
+   */
   private final Object outLock = new Object();
 
-  /** the ID string of the conduit (for logging) */
-  private String conduitIdStr;
+  /**
+   * the ID string of the conduit (for logging)
+   */
+  private final String conduitIdStr;
 
-  /** Identifies the java group member on the other side of the connection. */
-  InternalDistributedMember remoteAddr;
+  private InternalDistributedMember remoteAddr;
 
   /**
    * Identifies the version of the member on the other side of the connection.
@@ -242,19 +203,14 @@ public class Connection implements Runnable {
    * instance, server-connection -> owned p2p reader (count 0) -> owned p2p reader (count 1) ->
    * owned p2p reader (count 2). This shows up in thread names as "DOM #x" (domino #x)
    */
-  private static final ThreadLocal<Integer> dominoCount = new ThreadLocal<Integer>() {
-    @Override
-    protected Integer initialValue() {
-      return 0;
-    }
-  };
+  private static final ThreadLocal<Integer> dominoCount = withInitial(() -> 0);
 
   /**
    * How long to wait if receiver will not accept a message before we go into queue mode.
    *
    * @since GemFire 4.2.2
    */
-  private int asyncDistributionTimeout = 0;
+  private int asyncDistributionTimeout;
 
   /**
    * How long to wait, with the receiver not accepting any messages, before kicking the receiver out
@@ -262,7 +218,7 @@ public class Connection implements Runnable {
    *
    * @since GemFire 4.2.2
    */
-  private int asyncQueueTimeout = 0;
+  private int asyncQueueTimeout;
 
   /**
    * How much queued data we can have, with the receiver not accepting any messages, before kicking
@@ -271,12 +227,12 @@ public class Connection implements Runnable {
    *
    * @since GemFire 4.2.2
    */
-  private long asyncMaxQueueSize = 0;
+  private long asyncMaxQueueSize;
 
   /**
    * True if an async queue is already being filled.
    */
-  private volatile boolean asyncQueuingInProgress = false;
+  private volatile boolean asyncQueuingInProgress;
 
   /**
    * Maps ConflatedKey instances to ConflatedKey instance. Note that even though the key and value
@@ -284,9 +240,11 @@ public class Connection implements Runnable {
    */
   private final Map conflatedKeys = new HashMap();
 
-  // NOTE: LinkedBlockingQueue has a bug in which removes from the queue
-  // cause future offer to increase the size without adding anything to the queue.
-  // So I've changed from this backport class to a java.util.LinkedList
+  /**
+   * NOTE: LinkedBlockingQueue has a bug in which removes from the queue
+   * cause future offer to increase the size without adding anything to the queue.
+   * So I've changed from this backport class to a java.util.LinkedList
+   */
   private final LinkedList outgoingQueue = new LinkedList();
 
   /**
@@ -313,8 +271,6 @@ public class Connection implements Runnable {
   private volatile boolean handshakeRead;
   private volatile boolean handshakeCancelled;
 
-  private volatile int replyCode;
-
   private static final byte REPLY_CODE_OK = (byte) 69;
   private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte) 70;
 
@@ -329,19 +285,19 @@ public class Connection implements Runnable {
   /** set to true once a close begins */
   private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  private volatile boolean readerShuttingDown = false;
+  private volatile boolean readerShuttingDown;
 
   /** whether the socket is connected */
-  volatile boolean connected = false;
+  volatile boolean connected;
 
   /**
    * Set to true once a connection finishes its constructor
    */
-  private volatile boolean finishedConnecting = false;
+  private volatile boolean finishedConnecting;
 
   private volatile boolean accessed = true;
-  private volatile boolean socketInUse = false;
-  volatile boolean timedOut = false;
+  private volatile boolean socketInUse;
+  volatile boolean timedOut;
 
   /**
    * task for detecting ack timeouts and issuing alerts
@@ -385,7 +341,7 @@ public class Connection implements Runnable {
   private short messageId;
 
   /** whether the length of the next message has been established */
-  private boolean lengthSet = false;
+  private boolean lengthSet;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
@@ -402,7 +358,7 @@ public class Connection implements Runnable {
 
 
   /** is this connection used for serial message delivery? */
-  boolean preserveOrder = false;
+  boolean preserveOrder;
 
   /** number of messages sent on this connection */
   private long messagesSent;
@@ -416,12 +372,201 @@ public class Connection implements Runnable {
   private int sendBufferSize = -1;
   private int recvBufferSize = -1;
 
+  @MakeNotStatic
+  private static final ByteBuffer okHandshakeBuf;
+  static {
+    int msglen = 1; // one byte for reply code
+    byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
+    msglen = calcHdrSize(msglen);
+    bytes[MSG_HEADER_SIZE_OFFSET] = (byte) (msglen / 0x1000000 & 0xff);
+    bytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) (msglen / 0x10000 & 0xff);
+    bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) (msglen / 0x100 & 0xff);
+    bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
+    bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
+    bytes[MSG_HEADER_ID_OFFSET] = (byte) (MsgIdGenerator.NO_MSG_ID >> 8 & 0xff);
+    bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
+    bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
+    int allocSize = bytes.length;
+    ByteBuffer bb;
+    if (BufferPool.useDirectBuffers) {
+      bb = ByteBuffer.allocateDirect(allocSize);
+    } else {
+      bb = ByteBuffer.allocate(allocSize);
+    }
+    bb.put(bytes);
+    okHandshakeBuf = bb;
+  }
+
+  /**
+   * maximum message buffer size
+   */
+  public static final int MAX_MSG_SIZE = 0x00ffffff;
+
+  private static final int HANDSHAKE_TIMEOUT_MS =
+      Integer.getInteger("p2p.handshakeTimeoutMs", 59000);
+  // private static final byte HANDSHAKE_VERSION = 1; // 501
+  // public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
+  // public static final byte HANDSHAKE_VERSION = 3; // durable_client
+  // public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
+  // public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
+  // public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
+  // NOTICE: handshake_version should not be changed anymore. Use the gemfire version transmitted
+  // with the handshake bits and handle old handshakes based on that
+  private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
+
+  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+
+  private static final int CONNECT_HANDSHAKE_SIZE = 4096;
+
+  /** time between connection attempts */
+  private static final int RECONNECT_WAIT_TIME =
+      Integer.getInteger(GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
+
+  /**
+   * Batch sends currently should not be turned on because: 1. They will be used for all sends
+   * (instead of just no-ack) and thus will break messages that wait for a response (or kill perf).
+   * 2. The buffer is not properly flushed and closed on shutdown. The code attempts to do this but
+   * must not be doing it correctly.
+   */
+  private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
+  private static final int BATCH_BUFFER_SIZE =
+      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
+  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
+  private final Object batchLock = new Object();
+  private ByteBuffer fillBatchBuffer;
+  private ByteBuffer sendBatchBuffer;
+  private BatchBufferFlusher batchFlusher;
+
+  /**
+   * use to test message prep overhead (no socket write). WARNING: turning this on completely
+   * disables distribution of batched sends
+   */
+  private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
+
+  private final Object pusherSync = new Object();
+
+  private boolean disconnectRequested;
+
+  /**
+   * If true then act as if the socket buffer is full and start async queuing
+   */
+  @MutableForTesting
+  public static volatile boolean FORCE_ASYNC_QUEUE;
+
+  private static final int MAX_WAIT_TIME = 32; // ms (must be a power of 2)
+
+  /**
+   * stateLock is used to synchronize state changes.
+   */
+  private final Object stateLock = new Object();
+
+  /**
+   * for timeout processing, this is the current state of the connection
+   */
+  private byte connectionState = STATE_IDLE;
+
+  /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
+  /** the connection is idle, but may be in use */
+  private static final byte STATE_IDLE = 0;
+  /** the connection is in use and is transmitting data */
+  private static final byte STATE_SENDING = 1;
+  /** the connection is in use and is done transmitting */
+  private static final byte STATE_POST_SENDING = 2;
+  /** the connection is in use and is reading a direct-ack */
+  private static final byte STATE_READING_ACK = 3;
+  /** the connection is in use and has finished reading a direct-ack */
+  private static final byte STATE_RECEIVED_ACK = 4;
+  /** the connection is in use and is reading a message */
+  private static final byte STATE_READING = 5;
+  /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
+
+  /** set to true if we exceeded the ack-wait-threshold waiting for a response */
+  private volatile boolean ackTimedOut;
+
+  /**
+   * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
+   * done on the other side).
+   */
+  protected Connection(ConnectionTable connectionTable, Socket socket) throws ConnectionException {
+    if (connectionTable == null) {
+      throw new IllegalArgumentException("Null ConnectionTable");
+    }
+    conduit = connectionTable.getConduit();
+    isReceiver = true;
+    owner = connectionTable;
+    this.socket = socket;
+    InetSocketAddress conduitSocketId = conduit.getSocketId();
+    conduitIdStr = conduitSocketId.toString();
+    handshakeRead = false;
+    handshakeCancelled = false;
+    connected = true;
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(true);
+      setSendBufferSize(socket, SMALL_BUFFER_SIZE);
+      setReceiveBufferSize(socket);
+    } catch (SocketException e) {
+      // unable to get the settings we want. Don't log an error because it will likely happen a lot
+    }
+  }
+
+  public boolean isSharedResource() {
+    return sharedResource;
+  }
+
+  public static void makeReaderThread() {
+    // mark this thread as a reader thread
+    makeReaderThread(true);
+  }
+
+  private static void makeReaderThread(boolean v) {
+    isReaderThread.set(v);
+  }
+
+  // return true if this thread is a reader thread
+  private static boolean isReaderThread() {
+    return isReaderThread.get();
+  }
+
+  @VisibleForTesting
+  int getP2PConnectTimeout(DistributionConfig config) {
+    if (AlertingAction.isThreadAlerting()) {
+      return config.getMemberTimeout();
+    }
+    if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
+      return P2P_CONNECT_TIMEOUT;
+    String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
+    if (connectTimeoutStr != null) {
+      P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
+    } else {
+      P2P_CONNECT_TIMEOUT = 6 * config.getMemberTimeout();
+    }
+    IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
+    return P2P_CONNECT_TIMEOUT;
+  }
+
+  // return true if this thread is a reader thread
+  private static boolean tipDomino() {
+    if (DOMINO_THREAD_OWNED_SOCKETS) {
+      // mark this thread as one who wants to send ALL on TO sockets
+      ConnectionTable.threadWantsOwnResources();
+      isDominoThread.set(Boolean.TRUE);
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isDominoThread() {
+    return isDominoThread.get();
+  }
+
   private void setSendBufferSize(Socket sock) {
-    setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+    setSendBufferSize(sock, owner.getConduit().tcpBufferSize);
   }
 
   private void setReceiveBufferSize(Socket sock) {
-    setReceiveBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+    setReceiveBufferSize(sock, owner.getConduit().tcpBufferSize);
   }
 
   private void setSendBufferSize(Socket sock, int requestedSize) {
@@ -438,7 +583,7 @@ public class Connection implements Runnable {
         int currentSize = send ? sock.getSendBufferSize() : sock.getReceiveBufferSize();
         if (currentSize == requestedSize) {
           if (send) {
-            this.sendBufferSize = currentSize;
+            sendBufferSize = currentSize;
           }
           return;
         }
@@ -452,32 +597,32 @@ public class Connection implements Runnable {
       try {
         int actualSize = send ? sock.getSendBufferSize() : sock.getReceiveBufferSize();
         if (send) {
-          this.sendBufferSize = actualSize;
+          sendBufferSize = actualSize;
         } else {
-          this.recvBufferSize = actualSize;
+          recvBufferSize = actualSize;
         }
         if (actualSize < requestedSize) {
           logger.info("Socket {} is {} instead of the requested {}.",
-              (send ? "send buffer size" : "receive buffer size"),
+              send ? "send buffer size" : "receive buffer size",
               actualSize, requestedSize);
         } else if (actualSize > requestedSize) {
           if (logger.isTraceEnabled()) {
             logger.trace("Socket {} buffer size is {} instead of the requested {}",
-                (send ? "send" : "receive"), actualSize, requestedSize);
+                send ? "send" : "receive", actualSize, requestedSize);
           }
           // Remember the request size which is smaller.
           // This remembered value is used for allocating direct mem buffers.
           if (send) {
-            this.sendBufferSize = requestedSize;
+            sendBufferSize = requestedSize;
           } else {
-            this.recvBufferSize = requestedSize;
+            recvBufferSize = requestedSize;
           }
         }
       } catch (SocketException ignore) {
         if (send) {
-          this.sendBufferSize = requestedSize;
+          sendBufferSize = requestedSize;
         } else {
-          this.recvBufferSize = requestedSize;
+          recvBufferSize = requestedSize;
         }
       }
     }
@@ -487,7 +632,7 @@ public class Connection implements Runnable {
    * Returns the size of the send buffer on this connection's socket.
    */
   int getSendBufferSize() {
-    int result = this.sendBufferSize;
+    int result = sendBufferSize;
     if (result != -1) {
       return result;
     }
@@ -495,52 +640,20 @@ public class Connection implements Runnable {
       result = getSocket().getSendBufferSize();
     } catch (SocketException ignore) {
       // just return a default
-      result = this.owner.getConduit().tcpBufferSize;
+      result = owner.getConduit().tcpBufferSize;
     }
-    this.sendBufferSize = result;
+    sendBufferSize = result;
     return result;
   }
 
-  /**
-   * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
-   * done on
-   * the other side).
-   */
-  protected Connection(ConnectionTable t, Socket socket)
-      throws ConnectionException {
-    if (t == null) {
-      throw new IllegalArgumentException(
-          "Null ConnectionTable");
-    }
-    this.conduit = t.getConduit();
-    this.isReceiver = true;
-    this.owner = t;
-    this.socket = socket;
-    this.conduitIdStr = conduit.getSocketId().toString();
-    this.handshakeRead = false;
-    this.handshakeCancelled = false;
-    this.connected = true;
-
-    try {
-      socket.setTcpNoDelay(true);
-      socket.setKeepAlive(true);
-      setSendBufferSize(socket, SMALL_BUFFER_SIZE);
-      setReceiveBufferSize(socket);
-    } catch (SocketException e) {
-      // unable to get the settings we want. Don't log an error because it will
-      // likely happen a lot
-    }
-  }
-
   void initReceiver() {
-    this.startReader(owner);
+    startReader(owner);
   }
 
   void setIdleTimeoutTask(SystemTimerTask task) {
-    this.idleTask = task;
+    idleTask = task;
   }
 
-
   /**
    * Returns true if an idle connection was detected.
    */
@@ -548,75 +661,39 @@ public class Connection implements Runnable {
     if (isSocketClosed()) {
       return true;
     }
-    if (isSocketInUse() || (this.sharedResource && !this.preserveOrder)) { // shared/unordered
-                                                                           // connections are used
-                                                                           // for failure-detection
-                                                                           // and are not subject to
-                                                                           // idle-timeout
+    if (isSocketInUse() || sharedResource && !preserveOrder) {
+      // shared/unordered connections are used for failure-detection
+      // and are not subject to idle-timeout
       return false;
     }
-    boolean isIdle = !this.accessed;
-    this.accessed = false;
+    boolean isIdle = !accessed;
+    accessed = false;
     if (isIdle) {
-      this.timedOut = true;
-      this.owner.getConduit().getStats().incLostLease();
+      timedOut = true;
+      owner.getConduit().getStats().incLostLease();
       if (logger.isDebugEnabled()) {
-        logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource,
-            this.preserveOrder);
+        logger.debug("Closing idle connection {} shared={} ordered={}", this, sharedResource,
+            preserveOrder);
       }
       try {
-        // Instead of calling requestClose
-        // we call closeForReconnect.
-        // We don't want this timeout close to close
-        // any other connections. The problem with
-        // requestClose has removeEndpoint set to true
-        // which will close an receivers we have if this
-        // connection is a shared one.
-        closeForReconnect(
-            "idle connection timed out");
+        // Instead of calling requestClose we call closeForReconnect.
+        // We don't want this timeout close to close any other connections.
+        // The problem with requestClose has removeEndpoint set to true
+        // which will close an receivers we have if this connection is a shared one.
+        closeForReconnect("idle connection timed out");
       } catch (Exception ignore) {
       }
     }
     return isIdle;
   }
 
-  @MakeNotStatic
-  private static final ByteBuffer okHandshakeBuf;
-  static {
-    int msglen = 1; // one byte for reply code
-    byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
-    msglen = calcHdrSize(msglen);
-    bytes[MSG_HEADER_SIZE_OFFSET] = (byte) ((msglen / 0x1000000) & 0xff);
-    bytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) ((msglen / 0x10000) & 0xff);
-    bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
-    bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
-    bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
-    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
-    bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
-    bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
-    int allocSize = bytes.length;
-    ByteBuffer bb;
-    if (BufferPool.useDirectBuffers) {
-      bb = ByteBuffer.allocateDirect(allocSize);
-    } else {
-      bb = ByteBuffer.allocate(allocSize);
-    }
-    bb.put(bytes);
-    okHandshakeBuf = bb;
-  }
-
-  /**
-   * maximum message buffer size
-   */
-  public static final int MAX_MSG_SIZE = 0x00ffffff;
-
   static int calcHdrSize(int byteSize) {
     if (byteSize > MAX_MSG_SIZE) {
       throw new IllegalStateException(String.format("tcp message exceeded max size of %s",
           MAX_MSG_SIZE));
     }
     int hdrSize = byteSize;
-    hdrSize |= (HANDSHAKE_VERSION << 24);
+    hdrSize |= HANDSHAKE_VERSION << 24;
     return hdrSize;
   }
 
@@ -624,7 +701,7 @@ public class Connection implements Runnable {
     return hdrSize & MAX_MSG_SIZE;
   }
 
-  static byte calcHdrVersion(int hdrSize) throws IOException {
+  static void calcHdrVersion(int hdrSize) throws IOException {
     byte ver = (byte) (hdrSize >> 24);
     if (ver != HANDSHAKE_VERSION) {
       throw new IOException(
@@ -632,12 +709,11 @@ public class Connection implements Runnable {
               "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
               HANDSHAKE_VERSION, ver));
     }
-    return ver;
   }
 
   private void sendOKHandshakeReply() throws IOException, ConnectionException {
     ByteBuffer my_okHandshakeBuf;
-    if (this.isReceiver) {
+    if (isReceiver) {
       DistributionConfig cfg = owner.getConduit().getConfig();
       ByteBuffer bb;
       if (BufferPool.useDirectBuffers) {
@@ -653,8 +729,7 @@ public class Connection implements Runnable {
       bb.putInt(cfg.getAsyncQueueTimeout());
       bb.putInt(cfg.getAsyncMaxQueueSize());
       // write own product version
-      Version
-          .writeOrdinal(bb, Version.CURRENT.ordinal(), true);
+      Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
       // now set the msg length into position 0
       bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES));
       my_okHandshakeBuf = bb;
@@ -666,19 +741,6 @@ public class Connection implements Runnable {
     writeFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
   }
 
-  private static final int HANDSHAKE_TIMEOUT_MS =
-      Integer.getInteger("p2p.handshakeTimeoutMs", 59000);
-  // private static final byte HANDSHAKE_VERSION = 1; // 501
-  // public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
-  // public static final byte HANDSHAKE_VERSION = 3; // durable_client
-  // public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
-  // public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
-  // public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
-  // NOTICE: handshake_version should not be changed anymore. Use the gemfire
-  // version transmitted with the handshake bits and handle old handshakes
-  // based on that
-  private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
-
   /**
    * @throws ConnectionException if the conduit has stopped
    */
@@ -686,73 +748,69 @@ public class Connection implements Runnable {
     boolean needToClose = false;
     String reason = null;
     try {
-      synchronized (this.handshakeSync) {
-        if (!this.handshakeRead && !this.handshakeCancelled) {
-          boolean success = false;
+      synchronized (handshakeSync) {
+        if (!handshakeRead && !handshakeCancelled) {
           reason = "unknown";
           boolean interrupted = Thread.interrupted();
+          boolean success = false;
           try {
             final long endTime = System.currentTimeMillis() + HANDSHAKE_TIMEOUT_MS;
             long msToWait = HANDSHAKE_TIMEOUT_MS;
-            while (!this.handshakeRead && !this.handshakeCancelled && msToWait > 0) {
-              this.handshakeSync.wait(msToWait); // spurious wakeup ok
-              if (!this.handshakeRead && !this.handshakeCancelled) {
+            while (!handshakeRead && !handshakeCancelled && msToWait > 0) {
+              handshakeSync.wait(msToWait); // spurious wakeup ok
+              if (!handshakeRead && !handshakeCancelled) {
                 msToWait = endTime - System.currentTimeMillis();
               }
             }
-            if (!this.handshakeRead && !this.handshakeCancelled) {
+            if (!handshakeRead && !handshakeCancelled) {
               reason = "handshake timed out";
               String peerName;
-              if (this.remoteAddr != null) {
-                peerName = this.remoteAddr.toString();
+              if (remoteAddr != null) {
+                peerName = remoteAddr.toString();
                 // late in the life of jdk 1.7 we started seeing connections accepted
                 // when accept() was not even being called. This started causing timeouts
                 // to occur in the handshake threads instead of causing failures in
                 // connection-formation. So, we need to initiate suspect processing here
-                owner.getDM().getDistribution().suspectMember(this.remoteAddr,
+                owner.getDM().getDistribution().suspectMember(remoteAddr,
                     String.format(
                         "Connection handshake with %s timed out after waiting %s milliseconds.",
-
                         peerName, HANDSHAKE_TIMEOUT_MS));
               } else {
-                peerName = "socket " + this.socket.getRemoteSocketAddress().toString() + ":"
-                    + this.socket.getPort();
+                peerName = "socket " + socket.getRemoteSocketAddress() + ":" + socket.getPort();
               }
               throw new ConnectionException(
                   String.format(
                       "Connection handshake with %s timed out after waiting %s milliseconds.",
                       peerName, HANDSHAKE_TIMEOUT_MS));
-            } else {
-              success = this.handshakeRead;
             }
+            success = handshakeRead;
           } catch (InterruptedException ex) {
             interrupted = true;
-            this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+            owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
             reason = "interrupted";
           } finally {
             if (interrupted) {
               Thread.currentThread().interrupt();
             }
             if (success) {
-              if (this.isReceiver) {
+              if (isReceiver) {
                 needToClose =
-                    !owner.getConduit().getMembership().addSurpriseMember(this.remoteAddr);
+                    !owner.getConduit().getMembership().addSurpriseMember(remoteAddr);
                 if (needToClose) {
                   reason = "this member is shunned";
                 }
               }
             } else {
-              needToClose = true; // for bug 42159
+              needToClose = true;
             }
           }
-        } // !handshakeRead
-      } // synchronized
+        }
+      }
 
     } finally {
       if (needToClose) {
-        // moved this call outside of the sync for bug 42159
         try {
-          requestClose(reason); // fix for bug 31546
+          requestClose(reason);
         } catch (Exception ignore) {
         }
       }
@@ -765,18 +823,16 @@ public class Connection implements Runnable {
       ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
       buffer.position(0).limit(0);
     }
-    synchronized (this.handshakeSync) {
+    synchronized (handshakeSync) {
       if (success) {
-        this.handshakeRead = true;
+        handshakeRead = true;
       } else {
-        this.handshakeCancelled = true;
+        handshakeCancelled = true;
       }
-      this.handshakeSync.notify();
+      handshakeSync.notifyAll();
     }
   }
 
-  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
-
   /**
    * asynchronously close this connection
    *
@@ -786,18 +842,18 @@ public class Connection implements Runnable {
     // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
 
     // we do the close in a background thread because the operation may hang if
-    // there is a problem with the network. See bug #46659
+    // there is a problem with the network
 
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
     if (beingSickForTests) {
       prepareForAsyncClose();
     } else {
-      if (this.asyncCloseCalled.compareAndSet(false, true)) {
-        Socket s = this.socket;
+      if (asyncCloseCalled.compareAndSet(false, true)) {
+        Socket s = socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr),
+          owner.getSocketCloser().asyncClose(s, String.valueOf(remoteAddr),
               () -> ioFilter.close(s.getChannel()));
         }
       }
@@ -813,31 +869,29 @@ public class Connection implements Runnable {
     }
   }
 
-  private static final int CONNECT_HANDSHAKE_SIZE = 4096;
-
   /**
    * waits until we've joined the distributed system before returning
    */
   private void waitForAddressCompletion() {
-    InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
+    InternalDistributedMember myAddr = owner.getConduit().getMemberId();
     synchronized (myAddr) {
-      while ((!owner.getConduit().getCancelCriterion().isCancelInProgress())
+      while (!owner.getConduit().getCancelCriterion().isCancelInProgress()
           && myAddr.getInetAddress() == null && myAddr.getVmViewId() < 0) {
         try {
           myAddr.wait(100); // spurious wakeup ok
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
         }
       }
-      Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort());
+      Assert.assertTrue(myAddr.getDirectChannelPort() == owner.getConduit().getPort());
     }
   }
 
   private void handshakeFromNewSender() throws IOException {
     waitForAddressCompletion();
 
-    InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
+    InternalDistributedMember myAddr = owner.getConduit().getMemberId();
     final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
     /*
      * Note a byte of zero is always written because old products serialized a member id with always
@@ -848,30 +902,20 @@ public class Connection implements Runnable {
     connectHandshake.writeByte(HANDSHAKE_VERSION);
     // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
     InternalDataSerializer.invokeToData(myAddr, connectHandshake);
-    connectHandshake.writeBoolean(this.sharedResource);
-    connectHandshake.writeBoolean(this.preserveOrder);
-    connectHandshake.writeLong(this.uniqueId);
+    connectHandshake.writeBoolean(sharedResource);
+    connectHandshake.writeBoolean(preserveOrder);
+    connectHandshake.writeLong(uniqueId);
     // write the product version ordinal
     Version.CURRENT.writeOrdinal(connectHandshake, true);
     connectHandshake.writeInt(dominoCount.get() + 1);
     // this writes the sending member + thread name that is stored in senderName
     // on the receiver to show the cause of reader thread creation
-    // if (dominoCount.get() > 0) {
-    // connectHandshake.writeUTF(Thread.currentThread().getName());
-    // } else {
-    // String name = owner.getDM().getConfig().getName();
-    // if (name == null) {
-    // name = "pid="+OSProcess.getId();
-    // }
-    // connectHandshake.writeUTF("["+name+"] "+Thread.currentThread().getName());
-    // }
     connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
         MsgIdGenerator.NO_MSG_ID);
     writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
   /**
-   *
    * @throws IOException if handshake fails
    */
   private void attemptHandshake(ConnectionTable connTable) throws IOException {
@@ -885,32 +929,28 @@ public class Connection implements Runnable {
     waitForHandshake(); // waiting for reply
   }
 
-  /** time between connection attempts */
-  private static final int RECONNECT_WAIT_TIME = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
-
   /**
    * creates a new connection to a remote server. We are initiating this connection; the other side
    * must accept us We will almost always send messages; small acks are received.
    */
-  protected static Connection createSender(final Membership mgr, final ConnectionTable t,
+  static Connection createSender(final Membership mgr, final ConnectionTable t,
       final boolean preserveOrder, final DistributedMember remoteAddr, final boolean sharedResource,
       final long startTime, final long ackTimeout, final long ackSATimeout)
       throws IOException, DistributedSystemDisconnectedException {
-    boolean warningPrinted = false;
     boolean success = false;
-    boolean firstTime = true;
     Connection conn = null;
     // keep trying. Note that this may be executing during the shutdown window
     // where a cancel criterion has not been established, but threads are being
     // interrupted. In this case we must allow the connection to succeed even
     // though subsequent messaging using the socket may fail
     boolean interrupted = Thread.interrupted();
-    boolean severeAlertIssued = false;
-    boolean suspected = false;
-    long reconnectWaitTime = RECONNECT_WAIT_TIME;
-    boolean connectionErrorLogged = false;
     try {
+      boolean connectionErrorLogged = false;
+      long reconnectWaitTime = RECONNECT_WAIT_TIME;
+      boolean suspected = false;
+      boolean severeAlertIssued = false;
+      boolean firstTime = true;
+      boolean warningPrinted = false;
       while (!success) { // keep trying
         // Quit if DM has stopped distribution
         t.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -925,9 +965,9 @@ public class Connection implements Runnable {
           } else if (!suspected) {
             if (remoteAddr != null) {
               logger.warn("Unable to form a TCP/IP connection to {} in over {} seconds",
-                  remoteAddr, (ackTimeout) / 1000);
+                  remoteAddr, ackTimeout / 1000);
             }
-            mgr.suspectMember((InternalDistributedMember) remoteAddr,
+            mgr.suspectMember(remoteAddr,
                 "Unable to form a TCP/IP connection in a reasonable amount of time");
             suspected = true;
           }
@@ -936,9 +976,9 @@ public class Connection implements Runnable {
           if (reconnectWaitTime <= 0) {
             reconnectWaitTime = RECONNECT_WAIT_TIME;
           }
-        } else if (!suspected && (startTime > 0) && (ackTimeout > 0)
-            && (startTime + ackTimeout < now)) {
-          mgr.suspectMember((InternalDistributedMember) remoteAddr,
+        } else if (!suspected && startTime > 0 && ackTimeout > 0
+            && startTime + ackTimeout < now) {
+          mgr.suspectMember(remoteAddr,
               "Unable to form a TCP/IP connection in a reasonable amount of time");
           suspected = true;
         }
@@ -980,7 +1020,7 @@ public class Connection implements Runnable {
         try {
           conn = null;
           conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
-        } catch (javax.net.ssl.SSLHandshakeException se) {
+        } catch (SSLHandshakeException se) {
           // no need to retry if certificates were rejected
           throw se;
         } catch (IOException ioe) {
@@ -1013,8 +1053,7 @@ public class Connection implements Runnable {
               // and the socket was closed or we were sent
               // ShutdownMessage
               if (giveUpOnMember(mgr, remoteAddr)) {
-                throw new IOException(String.format("Member %s left the group",
-                    remoteAddr));
+                throw new IOException(String.format("Member %s left the group", remoteAddr));
               }
               t.getConduit().getCancelCriterion().checkCancelInProgress(null);
               // no success but no need to log; just retry
@@ -1069,11 +1108,9 @@ public class Connection implements Runnable {
         }
       }
     }
-    // Assert.assertTrue(conn != null);
     if (conn == null) {
       throw new ConnectionException(
-          String.format("Connection: failed construction for peer %s",
-              remoteAddr));
+          String.format("Connection: failed construction for peer %s", remoteAddr));
     }
     if (preserveOrder && BATCH_SENDS) {
       conn.createBatchSendBuffer();
@@ -1082,60 +1119,49 @@ public class Connection implements Runnable {
     return conn;
   }
 
-  private static boolean giveUpOnMember(Membership mgr,
-      DistributedMember remoteAddr) {
+  private static boolean giveUpOnMember(Membership mgr, DistributedMember remoteAddr) {
     return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
   }
 
-  private void setRemoteAddr(DistributedMember m) {
-    this.remoteAddr = this.owner.getDM().getCanonicalId(m);
-    Membership mgr = this.conduit.getMembership();
-    mgr.addSurpriseMember(m);
-  }
-
   /**
    * creates a new connection to a remote server. We are initiating this connection; the other side
    * must accept us We will almost always send messages; small acks are received.
    */
   private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
       boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
-
     // initialize a socket upfront. So that the
     InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
     if (t == null) {
-      throw new IllegalArgumentException(
-          "ConnectionTable is null.");
+      throw new IllegalArgumentException("ConnectionTable is null.");
     }
-    this.conduit = t.getConduit();
-    this.isReceiver = false;
-    this.owner = t;
+    conduit = t.getConduit();
+    isReceiver = false;
+    owner = t;
     this.sharedResource = sharedResource;
     this.preserveOrder = preserveOrder;
-    setRemoteAddr(remoteAddr);
-    this.conduitIdStr = this.owner.getConduit().getSocketId().toString();
-    this.handshakeRead = false;
-    this.handshakeCancelled = false;
-    this.connected = true;
+    this.remoteAddr = remoteAddr;
+    conduitIdStr = owner.getConduit().getSocketId().toString();
+    handshakeRead = false;
+    handshakeCancelled = false;
+    connected = true;
 
-    this.uniqueId = idCounter.getAndIncrement();
+    uniqueId = ID_COUNTER.getAndIncrement();
 
     // connect to listening socket
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
     SocketChannel channel = SocketChannel.open();
-    this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
+    owner.addConnectingSocket(channel.socket(), addr.getAddress());
 
     try {
       channel.socket().setTcpNoDelay(true);
       channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      /*
-       * If conserve-sockets is false, the socket can be used for receiving responses, so set the
-       * receive buffer accordingly.
-       */
+      // If conserve-sockets is false, the socket can be used for receiving responses, so set the
+      // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
       } else {
         setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
@@ -1146,231 +1172,98 @@ public class Connection implements Runnable {
       int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
 
       try {
-
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
-
-      } catch (NullPointerException e) {
-        // bug #45044 - jdk 1.7 sometimes throws an NPE here
-        ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
-        c.initCause(e);
-        // prevent a hot loop by sleeping a little bit
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-        throw c;
-      } catch (SSLException e) {
-        ConnectException c = new ConnectException("Problem connecting to peer " + addr);
-        c.initCause(e);
-        throw c;
-      } catch (CancelledKeyException | ClosedSelectorException e) {
-        // bug #44469: for some reason NIO throws this runtime exception
-        // instead of an IOException on timeouts
-        ConnectException c = new ConnectException(
-            String.format("Attempt timed out after %s milliseconds",
-                connectTime));
-        c.initCause(e);
-        throw c;
-      }
-    } finally {
-      this.owner.removeConnectingSocket(channel.socket());
-    }
-    this.socket = channel.socket();
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
-    }
-    try {
-      getSocket().setTcpNoDelay(true);
-    } catch (SocketException ignored) {
-    }
-  }
-
-  /**
-   * Batch sends currently should not be turned on because: 1. They will be used for all sends
-   * (instead of just no-ack) and thus will break messages that wait for a response (or kill perf).
-   * 2. The buffer is not properly flushed and closed on shutdown. The code attempts to do this but
-   * must not be doing it correctly.
-   */
-  private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
-  private static final int BATCH_BUFFER_SIZE =
-      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
-  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
-  private final Object batchLock = new Object();
-  private ByteBuffer fillBatchBuffer;
-  private ByteBuffer sendBatchBuffer;
-  private BatchBufferFlusher batchFlusher;
-
-  private void createBatchSendBuffer() {
-    if (BufferPool.useDirectBuffers) {
-      this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
-      this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
-    } else {
-      this.fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
-      this.sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
-    }
-    this.batchFlusher = new BatchBufferFlusher();
-    this.batchFlusher.start();
-  }
-
-  void cleanUpOnIdleTaskCancel() {
-    // Make sure receivers are removed from the connection table, this should always be a noop, but
-    // is done here as a failsafe.
-    if (isReceiver) {
-      owner.removeReceiver(this);
-    }
-  }
-
-  public void setInputBuffer(ByteBuffer buffer) {
-    this.inputBuffer = buffer;
-  }
-
-  private class BatchBufferFlusher extends Thread {
-    private volatile boolean flushNeeded = false;
-    private volatile boolean timeToStop = false;
-    private DMStats stats;
-
-
-    BatchBufferFlusher() {
-      setDaemon(true);
-      this.stats = owner.getConduit().getStats();
-    }
-
-    /**
-     * Called when a message writer needs the current fillBatchBuffer flushed
-     */
-    void flushBuffer(ByteBuffer bb) {
-      final long start = DistributionStats.getStatTime();
-      try {
-        synchronized (this) {
-          synchronized (batchLock) {
-            if (bb != fillBatchBuffer) {
-              // it must have already been flushed. So just return
-              // and use the new fillBatchBuffer
-              return;
-            }
-          }
-          this.flushNeeded = true;
-          this.notify();
-        }
-        synchronized (batchLock) {
-          // Wait for the flusher thread
-          while (bb == fillBatchBuffer) {
-            Connection.this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-            boolean interrupted = Thread.interrupted();
-            try {
-              batchLock.wait(); // spurious wakeup ok
-            } catch (InterruptedException ex) {
-              interrupted = true;
-            } finally {
-              if (interrupted) {
-                Thread.currentThread().interrupt();
-              }
-            }
-          } // while
-        }
-      } finally {
-        owner.getConduit().getStats().incBatchWaitTime(start);
-      }
-    }
-
-    public void close() {
-      synchronized (this) {
-        this.timeToStop = true;
-        this.flushNeeded = true;
-        this.notify();
-      }
-    }
-
-    @Override
-    public void run() {
-      try {
-        synchronized (this) {
-          while (!timeToStop) {
-            if (!this.flushNeeded && fillBatchBuffer.position() <= (BATCH_BUFFER_SIZE / 2)) {
-              wait(BATCH_FLUSH_MS); // spurious wakeup ok
-            }
-            if (this.flushNeeded || fillBatchBuffer.position() > (BATCH_BUFFER_SIZE / 2)) {
-              final long start = DistributionStats.getStatTime();
-              synchronized (batchLock) {
-                // This is the only block of code that will swap
-                // the buffer references
-                this.flushNeeded = false;
-                ByteBuffer tmp = fillBatchBuffer;
-                fillBatchBuffer = sendBatchBuffer;
-                sendBatchBuffer = tmp;
-                batchLock.notifyAll();
-              }
-              // We now own the sendBatchBuffer
-              if (sendBatchBuffer.position() > 0) {
-                final boolean origSocketInUse = socketInUse;
-                socketInUse = true;
-                try {
-                  sendBatchBuffer.flip();
-                  SocketChannel channel = getSocket().getChannel();
-                  writeFully(channel, sendBatchBuffer, false, null);
-                  sendBatchBuffer.clear();
-                } catch (IOException | ConnectionException ex) {
-                  logger.fatal("Exception flushing batch send buffer: %s", ex);
-                  readerShuttingDown = true;
-                  requestClose(String.format("Exception flushing batch send buffer: %s",
-                      ex));
-                } finally {
-                  accessed();
-                  socketInUse = origSocketInUse;
-                }
-              }
-              this.stats.incBatchFlushTime(start);
-            }
-          }
+
+        channel.socket().connect(addr, connectTime);
+
+        createIoFilter(channel, true);
+
+      } catch (NullPointerException e) {
+        // jdk 1.7 sometimes throws an NPE here
+        ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
+        c.initCause(e);
+        // prevent a hot loop by sleeping a little bit
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
-      } catch (InterruptedException ex) {
-        // time for this thread to shutdown
-        // Thread.currentThread().interrupt();
+        throw c;
+      } catch (SSLException e) {
+        ConnectException c = new ConnectException("Problem connecting to peer " + addr);
+        c.initCause(e);
+        throw c;
+      } catch (CancelledKeyException | ClosedSelectorException e) {
+        // for some reason NIO throws this runtime exception instead of an IOException on timeouts
+        ConnectException c = new ConnectException(
+            String.format("Attempt timed out after %s milliseconds", connectTime));
+        c.initCause(e);
+        throw c;
       }
+    } finally {
+      owner.removeConnectingSocket(channel.socket());
+    }
+    socket = channel.socket();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
+    }
+    try {
+      getSocket().setTcpNoDelay(true);
+    } catch (SocketException ignored) {
     }
   }
 
-  private void closeBatchBuffer() {
-    if (this.batchFlusher != null) {
-      this.batchFlusher.close();
+  private void createBatchSendBuffer() {
+    if (BufferPool.useDirectBuffers) {
+      fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+      sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+    } else {
+      fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
+      sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
     }
+    batchFlusher = new BatchBufferFlusher();
+    batchFlusher.start();
   }
 
-  /**
-   * use to test message prep overhead (no socket write). WARNING: turning this on completely
-   * disables distribution of batched sends
-   */
-  private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
+  void cleanUpOnIdleTaskCancel() {
+    // Make sure receivers are removed from the connection table, this should always be a noop, but
+    // is done here as a fail safe.
+    if (isReceiver) {
+      owner.removeReceiver(this);
+    }
+  }
 
-  private void batchSend(ByteBuffer src) throws IOException {
+  private void closeBatchBuffer() {
+    if (batchFlusher != null) {
+      batchFlusher.close();
+    }
+  }
+
+  private void batchSend(ByteBuffer src) {
     if (SOCKET_WRITE_DISABLED) {
       return;
     }
     final long start = DistributionStats.getStatTime();
     try {
-      ByteBuffer dst = null;
       Assert.assertTrue(src.remaining() <= BATCH_BUFFER_SIZE, "Message size(" + src.remaining()
           + ") exceeded BATCH_BUFFER_SIZE(" + BATCH_BUFFER_SIZE + ")");
       do {
-        synchronized (this.batchLock) {
-          dst = this.fillBatchBuffer;
+        ByteBuffer dst;
+        synchronized (batchLock) {
+          dst = fillBatchBuffer;
           if (src.remaining() <= dst.remaining()) {
             final long copyStart = DistributionStats.getStatTime();
             dst.put(src);
-            this.owner.getConduit().getStats().incBatchCopyTime(copyStart);
+            owner.getConduit().getStats().incBatchCopyTime(copyStart);
             return;
           }
         }
         // If we got this far then we do not have room in the current
         // buffer and need the flusher thread to flush before we can fill it
-        this.batchFlusher.flushBuffer(dst);
+        batchFlusher.flushBuffer(dst);
       } while (true);
     } finally {
-      this.owner.getConduit().getStats().incBatchSendTime(start);
+      owner.getConduit().getStats().incBatchSendTime(start);
     }
   }
 
@@ -1383,7 +1276,7 @@ public class Connection implements Runnable {
   }
 
   boolean isClosing() {
-    return this.closing.get();
+    return closing.get();
   }
 
   void closePartialConnect(String reason, boolean beingSick) {
@@ -1403,107 +1296,97 @@ public class Connection implements Runnable {
    *
    * @see #requestClose
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT")
+  @SuppressWarnings("TLW_TWO_LOCK_WAIT")
   private void close(String reason, boolean cleanupEndpoint, boolean p_removeEndpoint,
       boolean beingSick, boolean forceRemoval) {
-    boolean removeEndpoint = p_removeEndpoint;
-    // use getAndSet outside sync on this to fix 42330
-    boolean onlyCleanup = this.closing.getAndSet(true);
+    // use getAndSet outside sync on this
+    boolean onlyCleanup = closing.getAndSet(true);
     if (onlyCleanup && !forceRemoval) {
       return;
     }
+    boolean removeEndpoint = p_removeEndpoint;
     if (!onlyCleanup) {
       synchronized (this) {
-        this.stopped = true;
-        if (this.connected) {
-          if (this.asyncQueuingInProgress && this.pusherThread != Thread.currentThread()) {
+        stopped = true;
+        if (connected) {
+          if (asyncQueuingInProgress && pusherThread != Thread.currentThread()) {
             // We don't need to do this if we are the pusher thread
             // and we have determined that we need to close the connection.
-            // See bug 37601.
-            synchronized (this.outgoingQueue) {
+            synchronized (outgoingQueue) {
               // wait for the flusher to complete (it may timeout)
-              while (this.asyncQueuingInProgress) {
-                // Don't do this: causes closes to not get done in the event
-                // of an orderly shutdown:
-                // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+              while (asyncQueuingInProgress) {
                 boolean interrupted = Thread.interrupted();
                 try {
-                  this.outgoingQueue.wait(); // spurious wakeup ok
+                  outgoingQueue.wait(); // spurious wakeup ok
                 } catch (InterruptedException ie) {
                   interrupted = true;
-                  // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
                 } finally {
                   if (interrupted)
                     Thread.currentThread().interrupt();
                 }
-              } // while
-            } // synchronized
+              }
+            }
           }
-          this.connected = false;
+          connected = false;
           closeSenderSem();
-          {
-            final DMStats stats = this.owner.getConduit().getStats();
-            if (this.finishedConnecting) {
-              if (this.isReceiver) {
-                stats.decReceivers();
-              } else {
-                stats.decSenders(this.sharedResource, this.preserveOrder);
-              }
+
+          final DMStats stats = owner.getConduit().getStats();
+          if (finishedConnecting) {
+            if (isReceiver) {
+              stats.decReceivers();
+            } else {
+              stats.decSenders(sharedResource, preserveOrder);
             }
           }
+
         } else if (!forceRemoval) {
           removeEndpoint = false;
         }
         // make sure our socket is closed
         asyncClose(false);
-        if (!this.isReceiver) {
+        if (!isReceiver) {
           // receivers release the input buffer when exiting run(). Senders use the
           // inputBuffer for reading direct-reply responses
           releaseInputBuffer();
         }
         lengthSet = false;
-      } // synchronized
+      }
 
-      // moved the call to notifyHandshakeWaiter out of the above
-      // synchronized block to fix bug #42159
       // Make sure anyone waiting for a handshake stops waiting
       notifyHandshakeWaiter(false);
-      // wait a bit for the our reader thread to exit
-      // don't wait if we are the reader thread
+      // wait a bit for the our reader thread to exit don't wait if we are the reader thread
       boolean isIBM = false;
       // if network partition detection is enabled or this is an admin vm
-      // we can't wait for the reader thread when running in an IBM JRE. See
-      // bug 41889
-      if (this.conduit.getConfig().getEnableNetworkPartitionDetection()
-          || this.conduit.getMemberId().getVmKind() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
-          || this.conduit.getMemberId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
+      // we can't wait for the reader thread when running in an IBM JRE
+      if (conduit.getConfig().getEnableNetworkPartitionDetection()
+          || conduit.getMemberId().getVmKind() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
+          || conduit.getMemberId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
         isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
       }
-      {
-        // Now that readerThread is returned to a pool after we close
-        // we need to be more careful not to join on a thread that belongs
-        // to someone else.
-        Thread readerThreadSnapshot = this.readerThread;
-        if (!beingSick && readerThreadSnapshot != null && !isIBM && this.isRunning
-            && !this.readerShuttingDown && readerThreadSnapshot != Thread.currentThread()) {
-          try {
-            readerThreadSnapshot.join(500);
-            readerThreadSnapshot = this.readerThread;
-            if (this.isRunning && !this.readerShuttingDown && readerThreadSnapshot != null
-                && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system
-                                                           // failure
-              readerThreadSnapshot.join(1500);
-              if (this.isRunning) {
-                logger.info("Timed out waiting for readerThread on {} to finish.",
-                    this);
-              }
+
+      // Now that readerThread is returned to a pool after we close
+      // we need to be more careful not to join on a thread that belongs
+      // to someone else.
+      Thread readerThreadSnapshot = readerThread;
+      if (!beingSick && readerThreadSnapshot != null && !isIBM && isRunning
+          && !readerShuttingDown && readerThreadSnapshot != Thread.currentThread()) {
+        try {
+          readerThreadSnapshot.join(500);
+          readerThreadSnapshot = readerThread;
+          if (isRunning && !readerShuttingDown && readerThreadSnapshot != null
+              && owner.getDM().getRootCause() == null) {
+            // don't wait twice if there's a system failure
+            readerThreadSnapshot.join(1500);
+            if (isRunning) {
+              logger.info("Timed out waiting for readerThread on {} to finish.",
+                  this);
             }
-          } catch (IllegalThreadStateException ignore) {
-            // ignored - thread already stopped
-          } catch (InterruptedException ignore) {
-            Thread.currentThread().interrupt();
-            // but keep going, we're trying to close.
           }
+        } catch (IllegalThreadStateException ignore) {
+          // ignored - thread already stopped
+        } catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
+          // but keep going, we're trying to close.
         }
       }
 
@@ -1511,43 +1394,40 @@ public class Connection implements Runnable {
       closeAllMsgDestreamers();
     }
     if (cleanupEndpoint) {
-      if (this.isReceiver) {
-        this.owner.removeReceiver(this);
+      if (isReceiver) {
+        owner.removeReceiver(this);
       }
       if (removeEndpoint) {
-        if (this.sharedResource) {
-          if (!this.preserveOrder) {
-            // only remove endpoint when shared unordered connection
-            // is closed. This is part of the fix for bug 32980.
-            if (!this.isReceiver) {
+        if (sharedResource) {
+          if (!preserveOrder) {
+            // only remove endpoint when shared unordered connection is closed
+            if (!isReceiver) {
               // Only remove endpoint if sender.
-              if (this.finishedConnecting) {
+              if (finishedConnecting) {
                 // only remove endpoint if our constructor finished
-                this.owner.removeEndpoint(this.remoteAddr, reason);
+                owner.removeEndpoint(remoteAddr, reason);
               }
             }
           } else {
             // noinspection ConstantConditions
-            this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+            owner.removeSharedConnection(reason, remoteAddr, preserveOrder, this);
           }
-        } else if (!this.isReceiver) {
-          this.owner.removeThreadConnection(this.remoteAddr, this);
+        } else if (!isReceiver) {
+          owner.removeThreadConnection(remoteAddr, this);
         }
       } else {
-        // This code is ok to do even if the ConnectionTable
-        // has never added this Connection to its maps since
-        // the calls in this block use our identity to do the removes.
-        if (this.sharedResource) {
-          this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
-        } else if (!this.isReceiver) {
-          this.owner.removeThreadConnection(this.remoteAddr, this);
+        // This code is ok to do even if the ConnectionTable has never added this Connection to its
+        // maps since the calls in this block use our identity to do the removes.
+        if (sharedResource) {
+          owner.removeSharedConnection(reason, remoteAddr, preserveOrder, this);
+        } else if (!isReceiver) {
+          owner.removeThreadConnection(remoteAddr, this);
         }
       }
     }
 
-    // This cancels the idle timer task, but it also removes the tasks
-    // reference to this connection, freeing up the connection (and it's buffers
-    // for GC sooner.
+    // This cancels the idle timer task, but it also removes the tasks reference to this connection,
+    // freeing up the connection (and it's buffers for GC sooner.
     if (idleTask != null) {
       idleTask.cancel();
     }
@@ -1555,69 +1435,66 @@ public class Connection implements Runnable {
     if (ackTimeoutTask != null) {
       ackTimeoutTask.cancel();
     }
-
   }
 
-  /** starts a reader thread */
+  /**
+   * starts a reader thread
+   */
   private void startReader(ConnectionTable connTable) {
     if (logger.isDebugEnabled()) {
       logger.debug("Starting thread for " + p2pReaderName());
     }
-    Assert.assertTrue(!this.isRunning);
+    Assert.assertTrue(!isRunning);
     stopped = false;
-    this.isRunning = true;
+    isRunning = true;
     connTable.executeCommand(this);
   }
 
-
   /**
    * in order to read non-NIO socket-based messages we need to have a thread actively trying to grab
    * bytes out of the sockets input queue. This is that thread.
    */
   @Override
   public void run() {
-    this.readerThread = Thread.currentThread();
-    this.readerThread.setName(p2pReaderName());
+    readerThread = Thread.currentThread();
+    readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    makeReaderThread(this.isReceiver);
+    makeReaderThread(isReceiver);
     try {
       readMessages();
     } finally {
-      // bug36060: do the socket close within a finally block
+      // do the socket close within a finally block
       if (logger.isDebugEnabled()) {
         logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
       }
-      if (this.isReceiver) {
+      if (isReceiver) {
         try {
           initiateSuspicionIfSharedUnordered();
         } catch (CancelException e) {
           // shutting down
         }
-        if (!this.sharedResource) {
-          this.conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
+        if (!sharedResource) {
+          conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
         }
         asyncClose(false);
-        this.owner.removeAndCloseThreadOwnedSockets();
+        owner.removeAndCloseThreadOwnedSockets();
       }
       releaseInputBuffer();
 
-      // make sure that if the reader thread exits we notify a thread waiting
-      // for the handshake.
-      // see bug 37524 for an example of listeners hung in waitForHandshake
+      // make sure that if the reader thread exits we notify a thread waiting for the handshake.
       notifyHandshakeWaiter(false);
-      this.readerThread.setName("unused p2p reader");
-      synchronized (this.stateLock) {
-        this.isRunning = false;
-        this.readerThread = null;
+      readerThread.setName("unused p2p reader");
+      synchronized (stateLock) {
+        isRunning = false;
+        readerThread = null;
       }
-    } // finally
+    }
   }
 
   private void releaseInputBuffer() {
-    ByteBuffer tmp = this.inputBuffer;
+    ByteBuffer tmp = inputBuffer;
     if (tmp != null) {
-      this.inputBuffer = null;
-      final MembershipStatistics stats = this.owner.getConduit().getStats();
+      inputBuffer = null;
       getBufferPool().releaseReceiveBuffer(tmp);
     }
   }
@@ -1628,22 +1505,22 @@ public class Connection implements Runnable {
 
   private String p2pReaderName() {
     StringBuilder sb = new StringBuilder(64);
-    if (this.isReceiver) {
+    if (isReceiver) {
       sb.append(THREAD_KIND_IDENTIFIER + "@");
-    } else if (this.handshakeRead) {
+    } else if (handshakeRead) {
       sb.append("P2P message sender@");
     } else {
       sb.append("P2P handshake reader@");
     }
     sb.append(Integer.toHexString(System.identityHashCode(this)));
-    if (!this.isReceiver) {
+    if (!isReceiver) {
       sb.append('-').append(getUniqueId());
     }
     return sb.toString();
   }
 
   private void readMessages() {
-    // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
+    // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
       channel = getSocket().getChannel();
@@ -1654,66 +1531,58 @@ public class Connection implements Runnable {
       }
       channel.configureBlocking(true);
     } catch (ClosedChannelException e) {
-      // bug 37693: the channel was asynchronously closed. Our work
-      // is done.
+      // the channel was asynchronously closed. Our work is done.
       try {
-        requestClose(
-            "readMessages caught closed channel");
+        requestClose("readMessages caught closed channel");
       } catch (Exception ignore) {
       }
-      return; // exit loop and thread
+      // exit loop and thread
+      return;
     } catch (IOException ex) {
       if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
         try {
-          requestClose(
-              "readMessages caught shutdown");
+          requestClose("readMessages caught shutdown");
         } catch (Exception ignore) {
         }
-        return; // bug37520: exit loop (and thread)
+        // exit loop (and thread)
+        return;
       }
       logger.info("Failed initializing socket for message {}: {}",
-          (this.isReceiver ? "receiver" : "sender"), ex.getMessage());
-      this.readerShuttingDown = true;
+          isReceiver ? "receiver" : "sender", ex.getMessage());
+      readerShuttingDown = true;
       try {
-        requestClose(String.format("Failed initializing socket %s",
-            ex));
+        requestClose(String.format("Failed initializing socket %s", ex));
       } catch (Exception ignore) {
       }
       return;
     }
 
     if (!stopped) {
-      // Assert.assertTrue(owner != null, "How did owner become null");
       if (logger.isDebugEnabled()) {
         logger.debug("Starting {} on {}", p2pReaderName(), socket);
       }
     }
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
-    // fix for #40869
     boolean isHandShakeReader = false;
     // if we're using SSL/TLS the input buffer may already have data to process
     boolean skipInitialRead = getInputBuffer().position() > 0;
-    boolean isInitialRead = true;
     try {
-      for (;;) {
+      for (boolean isInitialRead = true;;) {
         if (stopped) {
           break;
         }
         if (SystemFailure.getFailure() != null) {
           // Allocate no objects here!
-          Socket s = this.socket;
-          if (s != null) {
-            try {
-              ioFilter.close(s.getChannel());
-              s.close();
-            } catch (IOException e) {
-              // don't care
-            }
+          try {
+            ioFilter.close(socket.getChannel());
+            socket.close();
+          } catch (IOException e) {
+            // don't care
           }
-          SystemFailure.checkFailure(); // throws
+          SystemFailure.checkFailure();
         }
-        if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+        if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
           break;
         }
 
@@ -1740,7 +1609,7 @@ public class Connection implements Runnable {
             continue;
           }
           if (amountRead < 0) {
-            this.readerShuttingDown = true;
+            readerShuttingDown = true;
             try {
               requestClose("SocketChannel.read returned EOF");
             } catch (Exception e) {
@@ -1751,9 +1620,9 @@ public class Connection implements Runnable {
 
           processInputBuffer();
 
-          if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
+          if (!isReceiver && (handshakeRead || handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
-              if (this.handshakeRead) {
+              if (handshakeRead) {
                 logger.debug("handshake has been read {}", this);
               } else {
                 logger.debug("handshake has been cancelled {}", this);
@@ -1767,26 +1636,22 @@ public class Connection implements Runnable {
           if (logger.isDebugEnabled()) {
             logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
           }
-          this.readerShuttingDown = true;
+          readerShuttingDown = true;
           try {
-            requestClose(
-                String.format("CacheClosed in channel read: %s", e));
+            requestClose(String.format("CacheClosed in channel read: %s", e));
           } catch (Exception ignored) {
           }
           return;
         } catch (ClosedChannelException e) {
-          this.readerShuttingDown = true;
+          readerShuttingDown = true;
           try {
-            requestClose(String.format("ClosedChannelException in channel read: %s",
-                e));
+            requestClose(String.format("ClosedChannelException in channel read: %s", e));
           } catch (Exception ignored) {
           }
           return;
         } catch (IOException e) {
-          if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for
-                                                                                     // Solaris jdk
-                                                                                     // 1.4.2_08
-          ) {
+          // "Socket closed" check needed for Solaris jdk 1.4.2_08
+          if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
             if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
               logger.debug("{} io exception for {}", p2pReaderName(), this, e);
             }
@@ -1798,28 +1663,26 @@ public class Connection implements Runnable {
               }
             }
           }
-          this.readerShuttingDown = true;
+          readerShuttingDown = true;
           try {
-            requestClose(
-                String.format("IOException in channel read: %s", e));
+            requestClose(String.format("IOException in channel read: %s", e));
           } catch (Exception ignored) {
           }
           return;
 
         } catch (Exception e) {
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
           if (!stopped && !isSocketClosed()) {
             logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e);
           }
-          this.readerShuttingDown = true;
+          readerShuttingDown = true;
           try {
-            requestClose(
-                String.format("%s exception in channel read", e));
+            requestClose(String.format("%s exception in channel read", e));
           } catch (Exception ignored) {
           }
           return;
         }
-      } // for
+      }
     } finally {
       if (!isHandShakeReader) {
         synchronized (stateLock) {
@@ -1840,8 +1703,7 @@ public class Connection implements Runnable {
           getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort());
 
       int packetBufferSize = engine.getSession().getPacketBufferSize();
-      if (inputBuffer == null
-          || (inputBuffer.capacity() < packetBufferSize)) {
+      if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
         // TLS has a minimum input buffer size constraint
         if (inputBuffer != null) {
           getBufferPool().releaseReceiveBuffer(inputBuffer);
@@ -1866,9 +1728,9 @@ public class Connection implements Runnable {
    * initiate suspect processing if a shared/ordered connection is lost and we're not shutting down
    */
   private void initiateSuspicionIfSharedUnordered() {
-    if (this.isReceiver && this.handshakeRead && !this.preserveOrder && this.sharedResource) {
-      if (!this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-        this.owner.getDM().getDistribution().suspectMember(this.getRemoteAddress(),
+    if (isReceiver && handshakeRead && !preserveOrder && sharedResource) {
+      if (!owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+        owner.getDM().getDistribution().suspectMember(getRemoteAddress(),
             INITIATING_SUSPECT_PROCESSING);
       }
     }
@@ -1889,61 +1751,61 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
-        || (msg.contains("connection reset"));
+    return msg.contains("forcibly closed")
+        || msg.contains("reset by peer")
+        || msg.contains("connection reset");
   }
 
   private static boolean validMsgType(int msgType) {
-    return msgType == NORMAL_MSG_TYPE || msgType == CHUNKED_MSG_TYPE
+    return msgType == NORMAL_MSG_TYPE
+        || msgType == CHUNKED_MSG_TYPE
         || msgType == END_CHUNKED_MSG_TYPE;
   }
 
   private void closeAllMsgDestreamers() {
-    synchronized (this.destreamerLock) {
-      if (this.idleMsgDestreamer != null) {
-        this.idleMsgDestreamer.close();
-        this.idleMsgDestreamer = null;
-      }
-      if (this.destreamerMap != null) {
-        Iterator it = this.destreamerMap.values().iterator();
-        while (it.hasNext()) {
-          MsgDestreamer md = (MsgDestreamer) it.next();
+    synchronized (destreamerLock) {
+      if (idleMsgDestreamer != null) {
+        idleMsgDestreamer.close();
+        idleMsgDestreamer = null;
+      }
+      if (destreamerMap != null) {
+        for (Object o : destreamerMap.values()) {
+          MsgDestreamer md = (MsgDestreamer) o;
           md.close();
         }
-        this.destreamerMap = null;
+        destreamerMap = null;
       }
     }
   }
 
-  MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
-    synchronized (this.destreamerLock) {
-      if (this.destreamerMap == null) {
-        this.destreamerMap = new HashMap();
+  private MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
+    synchronized (destreamerLock) {
+      if (destreamerMap == null) {
+        destreamerMap = new HashMap();
       }
-      Short key = new Short(msgId);
-      MsgDestreamer result = (MsgDestreamer) this.destreamerMap.get(key);
+      Short key = msgId;
+      MsgDestreamer result = (MsgDestreamer) destreamerMap.get(key);
       if (result == null) {
-        result = this.idleMsgDestreamer;
+        result = idleMsgDestreamer;
         if (result != null) {
-          this.idleMsgDestreamer = null;
+          idleMsgDestreamer = null;
         } else {
-          result = new MsgDestreamer(this.owner.getConduit().getStats(),
-              this.conduit.getCancelCriterion(), v);
+          result =
+              new MsgDestreamer(owner.getConduit().getStats(), conduit.getCancelCriterion(), v);
         }
         result.setName(p2pReaderName() + " msgId=" + msgId);
-        this.destreamerMap.put(key, result);
+        destreamerMap.put(key, result);
       }
       return result;
     }
   }
 
-  void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
-    Short key = new Short(msgId);
-    synchronized (this.destreamerLock) {
-      this.destreamerMap.remove(key);
-      if (this.idleMsgDestreamer == null) {
+  private void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
+    synchronized (destreamerLock) {
+      destreamerMap.remove(msgId);
+      if (idleMsgDestreamer == null) {
         md.reset();
-        this.idleMsgDestreamer = md;
+        idleMsgDestreamer = md;
       } else {
         md.close();
       }
@@ -1956,13 +1818,12 @@ public class Connection implements Runnable {
       ReplySender dm = new DirectReplySender(this);
       ReplyMessage.send(getRemoteAddress(), rpId, exception, dm);
     } else if (rpId != 0) {
-      DistributionManager dm = this.owner.getDM();
+      DistributionManager dm = owner.getDM();
       dm.getExecutors().getWaitingThreadPool()
           .execute(() -> ReplyMessage.send(getRemoteAddress(), rpId, exception, dm));
     }
   }
 
-
   /**
    * sends a serialized message to the other end of this connection. This is used by the
    * DirectChannel in GemFire when the message is going to be sent to multiple recipients.
@@ -1972,20 +1833,19 @@ public class Connection implements Runnable {
   void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
       DistributionMessage msg) throws IOException, ConnectionException {
     if (!connected) {
-      throw new ConnectionException(
-          String.format("Not connected to %s", this.remoteAddr));
+      throw new ConnectionException(String.format("Not connected to %s", remoteAddr));
     }
-    if (this.batchFlusher != null) {
+    if (batchFlusher != null) {
       batchSend(buffer);
       return;
     }
-    final boolean origSocketInUse = this.socketInUse;
+    final boolean origSocketInUse = socketInUse;
     byte originalState;
     synchronized (stateLock) {
-      originalState = this.connectionState;
-      this.connectionState = STATE_SENDING;
+      originalState = connectionState;
+      connectionState = STATE_SENDING;
     }
-    this.socketInUse = true;
+    socketInUse = true;
     try {
       SocketChannel channel = getSocket().getChannel();
       writeFully(channel, buffer, false, msg);
@@ -1994,70 +1854,68 @@ public class Connection implements Runnable {
       }
     } finally {
       accessed();
-      this.socketInUse = origSocketInUse;
+      socketInUse = origSocketInUse;
       synchronized (stateLock) {
-        this.connectionState = originalState;
+        connectionState = originalState;
       }
     }
   }
 
   /**
-   * If <code>use</code> is true then "claim" the connection for our use. If <code>use</code> is
-   * false then "release" the connection. Fixes bug 37657.
-   *
-   * @return true if connection was already in use at time of call; false if not.
+   * If {@code use} is true then "claim" the connection for our use. If {@code use} is
+   * false then "release" the connection.
    */
-  public boolean setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold,
+  public void setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold,
       List connectionGroup) {
     // just do the following; EVEN if the connection has been closed
-    final boolean origSocketInUse = this.socketInUse;
     synchronized (this) {
       if (use && (ackWaitThreshold > 0 || ackSAThreshold > 0)) {
         // set times that events should be triggered
-        this.transmissionStartTime = startTime;
-        this.ackWaitTimeout = ackWaitThreshold;
-        this.ackSATimeout = ackSAThreshold;
-        this.ackConnectionGroup = connectionGroup;
-        this.ackThreadName = Thread.currentThread().getName();
+        transmissionStartTime = startTime;
+        ackWaitTimeout = ackWaitThreshold;
+        ackSATimeout = ackSAThreshold;
+        ackConnectionGroup = connectionGroup;
+        ackThreadName = Thread.currentThread().getName();
       } else {
-        this.ackWaitTimeout = 0;
-        this.ackSATimeout = 0;
-        this.ackConnectionGroup = null;
-        this.ackThreadName = null;
+        ackWaitTimeout = 0;
+        ackSATimeout = 0;
+        ackConnectionGroup = null;
+        ackThreadName = null;
       }
-      synchronized (this.stateLock) {
-        this.connectionState = STATE_IDLE;
+      synchronized (stateLock) {
+        connectionState = STATE_IDLE;
       }
-      this.socketInUse = use;
+      socketInUse = use;
     }
     if (!use) {
       accessed();
     }
-    return origSocketInUse;
   }
 
   /**
    * For testing we want to configure the connection without having to read a handshake
    */
+  @VisibleForTesting
   void setSharedUnorderedForTest() {
-    this.preserveOrder = false;
-    this.sharedResource = true;
-    this.handshakeRead = true;
+    preserveOrder = false;
+    sharedResource = true;
+    handshakeRead = true;
   }
 
-
-  /** ensure that a task is running to monitor transmission and reading of acks */
+  /**
+   * ensure that a task is running to monitor transmission and reading of acks
+   */
   synchronized void scheduleAckTimeouts() {
     if (ackTimeoutTask == null) {
-      final long msAW = this.owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
-      final long msSA = this.owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
+      final long msAW = owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
+      final long msSA = owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
       ackTimeoutTask = new SystemTimer.SystemTimerTask() {
         @Override
         public void run2() {
           if (owner.isClosed()) {
             return;
           }
-          byte connState = -1;
+          byte connState;
           synchronized (stateLock) {
             connState = connectionState;
           }
@@ -2083,12 +1941,11 @@ public class Connection implements Runnable {
           }
           List group = ackConnectionGroup;
           if (sentAlert && group != null) {
-            // since transmission and ack-receipt are performed serially, we don't
-            // want to complain about all receivers out just because one was slow. We therefore
-            // reset
-            // the time stamps and give others more time
-            for (Iterator it = group.iterator(); it.hasNext();) {
-              Connection con = (Connection) it.next();
+            // since transmission and ack-receipt are performed serially, we don't want to complain
+            // about all receivers out just because one was slow. We therefore reset the time stamps
+            // and give others more time
+            for (Object o : group) {
+              Connection con = (Connection) o;
               if (con != Connection.this) {
                 con.transmissionStartTime += con.ackSATimeout;
               }
@@ -2110,10 +1967,12 @@ public class Connection implements Runnable {
     }
   }
 
-  /** ack-wait-threshold and ack-severe-alert-threshold processing */
+  /**
+   * ack-wait-threshold and ack-severe-alert-threshold processing
+   */
   private boolean doSevereAlertProcessing() {
     long now = System.currentTimeMillis();
-    if (ackSATimeout > 0 && (transmissionStartTime + ackWaitTimeout + ackSATimeout) <= now) {
+    if (ackSATimeout > 0 && transmissionStartTime + ackWaitTimeout + ackSATimeout <= now) {
       logger.fatal("{} seconds have elapsed waiting for a response from {} for thread {}",
           (ackWaitTimeout + ackSATimeout) / 1000L,
           getRemoteAddress(),
@@ -2121,17 +1980,18 @@ public class Connection implements Runnable {
       // turn off subsequent checks by setting the timeout to zero, then boot the member
       ackSATimeout = 0;
       return true;
-    } else if (!ackTimedOut && (0 < ackWaitTimeout)
-        && (transmissionStartTime + ackWaitTimeout) <= now) {
+    }
+    if (!ackTimedOut && 0 < ackWaitTimeout
+        && transmissionStartTime + ackWaitTimeout <= now) {
       logger.warn("{} seconds have elapsed waiting for a response from {} for thread {}",
           ackWaitTimeout / 1000L, getRemoteAddress(), ackThreadName);
       ackTimedOut = true;
 
-      final String state = (connectionState == Connection.STATE_SENDING)
+      final String state = connectionState == Connection.STATE_SENDING
           ? "Sender has been unable to transmit a message within ack-wait-threshold seconds"
           : "Sender has been unable to receive a response to a message within ack-wait-threshold seconds";
       if (ackSATimeout > 0) {
-        this.owner.getDM().getDistribution()
+        owner.getDM().getDistribution()
             .suspectMembers(Collections.singleton(getRemoteAddress()), state);
       }
     }
@@ -2140,7 +2000,7 @@ public class Connection implements Runnable {
 
   private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
       throws ConnectionException {
-    final DMStats stats = this.owner.getConduit().getStats();
+    final DMStats stats = owner.getConduit().getStats();
     long start = DistributionStats.getStatTime();
     try {
       ConflationKey ck = null;
@@ -2148,10 +2008,9 @@ public class Connection implements Runnable {
         ck = msg.getConflationKey();
       }
       Object objToQueue = null;
-      // if we can conflate delay the copy to see if we can reuse
-      // an already allocated buffer.
+      // if we can conflate delay the copy to see if we can reuse an already allocated buffer.
       final int newBytes = buffer.remaining();
-      final int origBufferPos = buffer.position(); // to fix bug 34832
+      final int origBufferPos = buffer.position();
       if (ck == null || !ck.allowsConflation()) {
         // do this outside of sync for multi thread perf
         ByteBuffer newbb = ByteBuffer.allocate(newBytes);
@@ -2159,15 +2018,14 @@ public class Connection implements Runnable {
         newbb.flip();
         objToQueue = newbb;
       }
-      synchronized (this.outgoingQueue) {
-        if (this.disconnectRequested) {
+      synchronized (outgoingQueue) {
+        if (disconnectRequested) {
           buffer.position(origBufferPos);
           // we have given up so just drop this message.
-          throw new ConnectionException(String.format("Forced disconnect sent to %s",
-              this.remoteAddr));
+          throw new ConnectionException(String.format("Forced disconnect sent to %s", remoteAddr));
         }
-        if (!force && !this.asyncQueuingInProgress) {
-          // reset buffer since we will be sending it. This fixes bug 34832
+        if (!force && !asyncQueuingInProgress) {
+          // reset buffer since we will be sending it
           buffer.position(origBufferPos);
           // the pusher emptied the queue so don't add since we are not forced to.
           return false;
@@ -2176,25 +2034,29 @@ public class Connection implements Runnable {
         if (ck != null) {
           if (ck.allowsConflation()) {
             objToQueue = ck;
-            Object oldValue = this.conflatedKeys.put(ck, ck);
+            Object oldValue = conflatedKeys.put(ck, ck);
             if (oldValue != null) {
               ConflationKey oldck = (ConflationKey) oldValue;
               ByteBuffer oldBuffer = oldck.getBuffer();
               // need to always do this to allow old buffer to be gc'd
               oldck.setBuffer(null);
+
               // remove the conflated key from current spot in queue
+
               // Note we no longer remove from the queue because the search
               // can be expensive on large queues. Instead we just wait for
               // the queue removal code to find the oldck and ignore it since
               // its buffer is null
+
               // We do a quick check of the last thing in the queue
               // and if it has the same identity of our last thing then
               // remove it
-              if (this.outgoingQueue.getLast() == oldck) {
-                this.outgoingQueue.removeLast();
+
+              if (outgoingQueue.getLast() == oldck) {
+                outgoingQueue.removeLast();
               }
               int oldBytes = oldBuffer.remaining();
-              this.queuedBytes -= oldBytes;
+              queuedBytes -= oldBytes;
               stats.incAsyncQueueSize(-oldBytes);
               stats.incAsyncConflatedMsgs();
               didConflation = true;
@@ -2220,23 +2082,23 @@ public class Connection implements Runnable {
             }
           } else {
             // just forget about having a conflatable operation
-            /* Object removedVal = */ this.conflatedKeys.remove(ck);
+            conflatedKeys.remove(ck);
           }
         }
-        {
-          long newQueueSize = newBytes + this.queuedBytes;
-          if (newQueueSize > this.asyncMaxQueueSize) {
-            logger.warn("Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                newQueueSize, this.asyncMaxQueueSize, this.remoteAddr);
-            stats.incAsyncQueueSizeExceeded(1);
-            disconnectSlowReceiver();
-            // reset buffer since we will be sending it
-            buffer.position(origBufferPos);
-            return false;
-          }
+
+        long newQueueSize = newBytes + queuedBytes;
+        if (newQueueSize > asyncMaxQueueSize) {
+          logger.warn("Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
+              newQueueSize, asyncMaxQueueSize, remoteAddr);
+          stats.incAsyncQueueSizeExceeded(1);
+          disconnectSlowReceiver();
+          // reset buffer since we will be sending it
+          buffer.position(origBufferPos);
+          return false;
         }
-        this.outgoingQueue.addLast(objToQueue);
-        this.queuedBytes += newBytes;
+
+        outgoingQueue.addLast(objToQueue);
+        queuedBytes += newBytes;
         stats.incAsyncQueueSize(newBytes);
         if (!didConflation) {
           stats.incAsyncQueuedMsgs();
@@ -2260,62 +2122,57 @@ public class Connection implements Runnable {
       throws ConnectionException {
     if (!addToQueue(buffer, msg, true)) {
       return false;
-    } else {
-      startMessagePusher();
-      return true;
     }
+    startMessagePusher();
+    return true;
   }
 
-  private final Object pusherSync = new Object();
-
   private void startMessagePusher() {
-    synchronized (this.pusherSync) {
-      while (this.pusherThread != null) {
+    synchronized (pusherSync) {
+      while (pusherThread != null) {
         // wait for previous pusher thread to exit
         boolean interrupted = Thread.interrupted();
         try {
-          this.pusherSync.wait(); // spurious wakeup ok
+          pusherSync.wait(); // spurious wakeup ok
         } catch (InterruptedException ex) {
           interrupted = true;
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
         } finally {
           if (interrupted) {
             Thread.currentThread().interrupt();
           }
         }
       }
-      this.asyncQueuingInProgress = true;
-      this.pusherThread =
-          new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runMessagePusher);
-    } // synchronized
-    this.pusherThread.start();
+      asyncQueuingInProgress = true;
+      pusherThread = new LoggingThread("P2P async pusher to " + remoteAddr, this::runMessagePusher);
+    }
+    pusherThread.start();
   }
 
-  private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
-    ByteBuffer result = null;
-    final DMStats stats = this.owner.getConduit().getStats();
+  private ByteBuffer takeFromOutgoingQueue() {
+    final DMStats stats = owner.getConduit().getStats();
     long start = DistributionStats.getStatTime();
     try {
-      synchronized (this.outgoingQueue) {
-        if (this.disconnectRequested) {
+      ByteBuffer result = null;
+      synchronized (outgoingQueue) {
+        if (disconnectRequested) {
           // don't bother with anymore work since we are done
-          this.asyncQueuingInProgress = false;
-          this.outgoingQueue.notifyAll();
+          asyncQueuingInProgress = false;
+          outgoingQueue.notifyAll();
           return null;
         }
-        // Object o = this.outgoingQueue.poll();
         do {
-          if (this.outgoingQueue.isEmpty()) {
+          if (outgoingQueue.isEmpty()) {
             break;
           }
-          Object o = this.outgoingQueue.removeFirst();
+          Object o = outgoingQueue.removeFirst();
           if (o == null) {
             break;
           }
           if (o instanceof ConflationKey) {
             result = ((ConflationKey) o).getBuffer();
             if (result != null) {
-              this.conflatedKeys.remove(o);
+              conflatedKeys.remove(o);
             } else {
               // if result is null then this same key will be found later in the
               // queue so we just need to skip this entry
@@ -2325,13 +2182,13 @@ public class Connection implements Runnable {
             result = (ByteBuffer) o;
           }
           int newBytes = result.remaining();
-          this.queuedBytes -= newBytes;
+          queuedBytes -= newBytes;
           stats.incAsyncQueueSize(-newBytes);
           stats.incAsyncDequeuedMsgs();
         } while (result == null);
         if (result == null) {
-          this.asyncQueuingInProgress = false;
-          this.outgoingQueue.notifyAll();
+          asyncQueuingInProgress = false;
+          outgoingQueue.notifyAll();
         }
       }
       return result;
@@ -2342,50 +2199,43 @@ public class Connection implements Runnable {
     }
   }
 
-  private boolean disconnectRequested = false;
-
-
   /**
    * @since GemFire 4.2.2
    */
   private void disconnectSlowReceiver() {
-    synchronized (this.outgoingQueue) {
-      if (this.disconnectRequested) {
+    synchronized (outgoingQueue) {
+      if (disconnectRequested) {
         // only ask once
         return;
       }
-      this.disconnectRequested = true;
+      disconnectRequested = true;
     }
-    DistributionManager dm = this.owner.getDM();
+    DistributionManager dm = owner.getDM();
     if (dm == null) {
-      this.owner.removeEndpoint(this.remoteAddr,
-          "no distribution manager");
+      owner.removeEndpoint(remoteAddr, "no distribution manager");
       return;
     }
-    dm.getDistribution().requestMemberRemoval(this.remoteAddr,
+    dm.getDistribution().requestMemberRemoval(remoteAddr,
         "Disconnected as a slow-receiver");
     // Ok, we sent the message, the coordinator should kick the member out
     // immediately and inform this process with a new view.
-    // Let's wait
-    // for that to happen and if it doesn't in X seconds
-    // then remove the endpoint.
-    final int FORCE_TIMEOUT = 3000;
-    while (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
+    // Let's wait for that to happen and if it doesn't in X seconds then remove the endpoint.
+    while (dm.getOtherDistributionManagerIds().contains(remoteAddr)) {
       try {
         Thread.sleep(50);
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
         return;
       }
     }
-    this.owner.removeEndpoint(this.remoteAddr,
+    owner.removeEndpoint(remoteAddr,
         "Force disconnect timed out");
-    if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
+    if (dm.getOtherDistributionManagerIds().contains(remoteAddr)) {
       if (logger.isDebugEnabled()) {
-        logger.debug("Force disconnect timed out after waiting {} seconds", (FORCE_TIMEOUT / 1000));
+        final int FORCE_TIMEOUT = 3000;
+        logger.debug("Force disconnect timed out after waiting {} seconds", FORCE_TIMEOUT / 1000);
       }
-      return;
     }
   }
 
@@ -2394,7 +2244,7 @@ public class Connection implements Runnable {
    */
   private void runMessagePusher() {
     try {
-      final DMStats stats = this.owner.getConduit().getStats();
+      final DMStats stats = owner.getConduit().getStats();
       final long threadStart = stats.startAsyncThread();
       try {
         stats.incAsyncQueues(1);
@@ -2402,10 +2252,10 @@ public class Connection implements Runnable {
 
         try {
           int flushId = 0;
-          while (this.asyncQueuingInProgress && this.connected) {
+          while (asyncQueuingInProgress && connected) {
             if (SystemFailure.getFailure() != null) {
               // Allocate no objects here!
-              Socket s = this.socket;
+              Socket s = socket;
               if (s != null) {
                 try {
                   logger.debug("closing socket", new Exception("closing socket"));
@@ -2415,19 +2265,19 @@ public class Connection implements Runnable {
                   // don't care
                 }
               }
-              SystemFailure.checkFailure(); // throws
+              SystemFailure.checkFailure();
             }
-            if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+            if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
               break;
             }
             flushId++;
             long flushStart = stats.startAsyncQueueFlush();
             try {
-              long curQueuedBytes = this.queuedBytes;
-              if (curQueuedBytes > this.asyncMaxQueueSize) {
+              long curQueuedBytes = queuedBytes;
+              if (curQueuedBytes > asyncMaxQueueSize) {
                 logger.warn(
                     "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                    curQueuedBytes, this.asyncMaxQueueSize, this.remoteAddr);
+                    curQueuedBytes, asyncMaxQueueSize, remoteAddr);
                 stats.incAsyncQueueSizeExceeded(1);
                 disconnectSlowReceiver();
                 return;
@@ -2441,28 +2291,21 @@ public class Connection implements Runnable {
                 return;
               }
               writeFully(channel, bb, true, null);
-              // We should not add messagesSent here according to Bruce.
-              // The counts are increased elsewhere.
-              // messagesSent++;
+              // We should not add messagesSent. The counts are increased elsewhere.
               accessed();
             } finally {
               stats.endAsyncQueueFlush(flushStart);
             }
-          } // while
+          }
         } finally {
           // need to force this to false before doing the requestClose calls
-          synchronized (this.outgoingQueue) {
-            this.asyncQueuingInProgress = false;
-            this.outgoingQueue.notifyAll();
+          synchronized (outgoingQueue) {
+            asyncQueuingInProgress = false;
+            outgoingQueue.notifyAll();
           }
         }
-      } catch (InterruptedException ex) {
-        // someone wants us to stop.
-        // No need to set interrupt bit, we're quitting.
-        // No need to throw an error, we're quitting.
       } catch (IOException ex) {
-        final String err =
-            String.format("P2P pusher io exception for %s", this);
+        String err = String.format("P2P pusher io exception for %s", this);
         if (!isSocketClosed()) {
           if (logger.isDebugEnabled() && !isIgnorableIOException(ex)) {
             logger.debug(err, ex);
@@ -2472,17 +2315,15 @@ public class Connection implements Runnable {
           requestClose(err + ": " + ex);
         } catch (Exception ignore) {
         }
-      } catch (CancelException ex) { // bug 37367
-        final String err = String.format("P2P pusher %s caught CacheClosedException: %s",
-            this, ex);
+      } catch (CancelException ex) {
+        String err = String.format("P2P pusher %s caught CacheClosedException: %s", this, ex);
         logger.debug(err);
         try {
           requestClose(err);
         } catch (Exception ignore) {
         }
-        return;
       } catch (Exception ex) {
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); // bug 37101
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
         if (!isSocketClosed()) {
           logger.fatal(String.format("P2P pusher exception: %s", ex), ex);
         }
@@ -2491,8 +2332,8 @@ public class Connection implements Runnable {
         } catch (Exception ignore) {
         }
       } finally {
-        stats.incAsyncQueueSize(-this.queuedBytes);
-        this.queuedBytes = 0;
+        stats.incAsyncQueueSize(-queuedBytes);
+        queuedBytes = 0;
         stats.endAsyncThread(threadStart);
         stats.incAsyncThreads(-1);
         stats.incAsyncQueues(-1);
@@ -2502,9 +2343,9 @@ public class Connection implements Runnable {
         }
       }
     } finally {
-      synchronized (this.pusherSync) {
-        this.pusherThread = null;
-        this.pusherSync.notify();
+      synchronized (pusherSync) {
+        pusherThread = null;
+        pusherSync.notifyAll();
       }
     }
   }
@@ -2519,34 +2360,26 @@ public class Connection implements Runnable {
     }
     // only use sync writes if:
     // we are already queuing
-    if (this.asyncQueuingInProgress) {
+    if (asyncQueuingInProgress) {
       // it will just tack this msg onto the outgoing queue
       return true;
     }
     // or we are a receiver
-    if (this.isReceiver) {
+    if (isReceiver) {
       return true;
     }
     // or we are an unordered connection
-    if (!this.preserveOrder) {
+    if (!preserveOrder) {
       return true;
     }
     // or the receiver does not allow queuing
-    if (this.asyncDistributionTimeout == 0) {
+    if (asyncDistributionTimeout == 0) {
       return true;
     }
     // OTHERWISE return false and let caller send async
     return false;
   }
 
-  /**
-   * If true then act as if the socket buffer is full and start async queuing
-   */
-  @MutableForTesting
-  public static volatile boolean FORCE_ASYNC_QUEUE = false;
-
-  private static final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2)
-
   private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage p_msg, final DMStats stats) throws IOException {
     DistributionMessage msg = p_msg;
@@ -2556,10 +2389,10 @@ public class Connection implements Runnable {
     int retries = 0;
     int totalAmtWritten = 0;
     try {
-      synchronized (this.outLock) {
+      synchronized (outLock) {
         if (!forceAsync) {
           // check one more time while holding outLock in case a pusher was created
-          if (this.asyncQueuingInProgress) {
+          if (asyncQueuingInProgress) {
             if (addToQueue(buffer, msg, false)) {
               return;
             }
@@ -2569,19 +2402,19 @@ public class Connection implements Runnable {
         socketWriteStarted = true;
         startSocketWrite = stats.startSocketWrite(false);
         long now = System.currentTimeMillis();
-        int waitTime = 1;
         long distributionTimeoutTarget = 0;
         // if asyncDistributionTimeout == 1 then we want to start queuing
         // as soon as we do a non blocking socket write that returns 0
-        if (this.asyncDistributionTimeout != 1) {
-          distributionTimeoutTarget = now + this.asyncDistributionTimeout;
+        if (asyncDistributionTimeout != 1) {
+          distributionTimeoutTarget = now + asyncDistributionTimeout;
         }
-        long queueTimeoutTarget = now + this.asyncQueueTimeout;
+        long queueTimeoutTarget = now + asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
           ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+          int waitTime = 1;
           do {
-            this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+            owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
             retries++;
             int amtWritten;
             if (FORCE_ASYNC_QUEUE) {
@@ -2600,10 +2433,10 @@ public class Connection implements Runnable {
                           "Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
                     } else {
                       long blockedMs = now - distributionTimeoutTarget;
-                      blockedMs += this.asyncDistributionTimeout;
+                      blockedMs += asyncDistributionTimeout;
                       logger.debug(
                           "Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
-                          blockedMs, this.asyncDistributionTimeout);
+                          blockedMs, asyncDistributionTimeout);
                     }
                   }
                   stats.incAsyncDistributionTimeoutExceeded();
@@ -2621,32 +2454,30 @@ public class Connection implements Runnable {
                 timeoutTarget = distributionTimeoutTarget;
               } else {
                 boolean disconnectNeeded = false;
-                long curQueuedBytes = this.queuedBytes;
-                if (curQueuedBytes > this.asyncMaxQueueSize) {
+                long curQueuedBytes = queuedBytes;
+                if (curQueuedBytes > asyncMaxQueueSize) {
                   logger.warn(
                       "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                      curQueuedBytes,
-                      this.asyncMaxQueueSize, this.remoteAddr);
+                      curQueuedBytes, asyncMaxQueueSize, remoteAddr);
                   stats.incAsyncQueueSizeExceeded(1);
                   disconnectNeeded = true;
                 }
                 if (now > queueTimeoutTarget) {
-                  // we have waited long enough
-                  // the pusher has been idle too long!
+                  // we have waited long enough the pusher has been idle too long!
                   long blockedMs = now - queueTimeoutTarget;
-                  blockedMs += this.asyncQueueTimeout;
+                  blockedMs += asyncQueueTimeout;
                   logger.warn(
                       "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
                       blockedMs,
-                      this.asyncQueueTimeout, this.remoteAddr);
+                      asyncQueueTimeout, remoteAddr);
                   stats.incAsyncQueueTimeouts(1);
                   disconnectNeeded = true;
                 }
                 if (disconnectNeeded) {
                   disconnectSlowReceiver();
-                  synchronized (this.outgoingQueue) {
-                    this.asyncQueuingInProgress = false;
-                    this.outgoingQueue.notifyAll(); // for bug 42330
+                  synchronized (outgoingQueue) {
+                    asyncQueuingInProgress = false;
+                    outgoingQueue.notifyAll();
                   }
                   return;
                 }
@@ -2669,7 +2500,7 @@ public class Connection implements Runnable {
                     Thread.sleep(msToWait);
                   } catch (InterruptedException ex) {
                     interrupted = true;
-                    this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+                    owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
                   } finally {
                     if (interrupted) {
                       Thread.currentThread().interrupt();
@@ -2685,7 +2516,7 @@ public class Connection implements Runnable {
             else {
               totalAmtWritten += amtWritten;
               // reset queueTimeoutTarget since we made some progress
-              queueTimeoutTarget = System.currentTimeMillis() + this.asyncQueueTimeout;
+              queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
               waitTime = 1;
             }
           } while (wrappedBuffer.remaining() > 0);
@@ -2709,23 +2540,24 @@ public class Connection implements Runnable {
    * @param forceAsync true if we need to force a blocking async write.
    * @throws ConnectionException if the conduit has stopped
    */
+  @VisibleForTesting
   void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage msg) throws IOException, ConnectionException {
-    final DMStats stats = this.owner.getConduit().getStats();
-    if (!this.sharedResource) {
+    final DMStats stats = owner.getConduit().getStats();
+    if (!sharedResource) {
       stats.incTOSentMsg();
     }
     if (useSyncWrites(forceAsync)) {
-      if (this.asyncQueuingInProgress) {
+      if (asyncQueuingInProgress) {
         if (addToQueue(buffer, msg, false)) {
           return;
         }
         // fall through
       }
       long startLock = stats.startSocketLock();
-      synchronized (this.outLock) {
+      synchronized (outLock) {
         stats.endSocketLock(startLock);
-        if (this.asyncQueuingInProgress) {
+        if (asyncQueuingInProgress) {
           if (addToQueue(buffer, msg, false)) {
             return;
           }
@@ -2742,18 +2574,20 @@ public class Connection implements Runnable {
           }
         }
 
-      } // synchronized
+      }
     } else {
       writeAsync(channel, buffer, forceAsync, msg, stats);
     }
   }
 
-  /** gets the buffer for receiving message length bytes */
+  /**
+   * gets the buffer for receiving message length bytes
+   */
   private ByteBuffer getInputBuffer() {
     if (inputBuffer == null) {
-      int allocSize = this.recvBufferSize;
+      int allocSize = recvBufferSize;
       if (allocSize == -1) {
-        allocSize = this.owner.getConduit().tcpBufferSize;
+        allocSize = owner.getConduit().tcpBufferSize;
       }
       inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
     }
@@ -2761,47 +2595,20 @@ public class Connection implements Runnable {
   }
 
   /**
-   * stateLock is used to synchronize state changes.
-   */
-  private final Object stateLock = new Object();
-
-  /** for timeout processing, this is the current state of the connection */
-  private byte connectionState = STATE_IDLE;
-
-  /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
-  /** the connection is idle, but may be in use */
-  private static final byte STATE_IDLE = 0;
-  /** the connection is in use and is transmitting data */
-  private static final byte STATE_SENDING = 1;
-  /** the connection is in use and is done transmitting */
-  private static final byte STATE_POST_SENDING = 2;
-  /** the connection is in use and is reading a direct-ack */
-  private static final byte STATE_READING_ACK = 3;
-  /** the connection is in use and has finished reading a direct-ack */
-  private static final byte STATE_RECEIVED_ACK = 4;
-  /** the connection is in use and is reading a message */
-  private static final byte STATE_READING = 5;
-  /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
-
-  /** set to true if we exceeded the ack-wait-threshold waiting for a response */
-  private volatile boolean ackTimedOut;
-
-  /**
    * @throws SocketTimeoutException if wait expires.
    * @throws ConnectionException if ack is not received
    */
   public void readAck(final DirectReplyProcessor processor)
       throws SocketTimeoutException, ConnectionException {
     if (isSocketClosed()) {
-      throw new ConnectionException(
-          "connection is closed");
+      throw new ConnectionException("connection is closed");
     }
-    synchronized (this.stateLock) {
-      this.connectionState = STATE_READING_ACK;
+    synchronized (stateLock) {
+      connectionState = STATE_READING_ACK;
     }
 
-    boolean origSocketInUse = this.socketInUse;
-    this.socketInUse = true;
+    boolean origSocketInUse = socketInUse;
+    socketInUse = true;
     MsgReader msgReader = null;
     DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
@@ -2810,7 +2617,7 @@ public class Connection implements Runnable {
 
       Header header = msgReader.readHeader();
 
-      ReplyMessage msg = null;
+      ReplyMessage msg;
       int len;
       if (header.getMessageType() == NORMAL_MSG_TYPE) {
         msg = (ReplyMessage) msgReader.readMessage(header);
@@ -2832,7 +2639,7 @@ public class Connection implements Runnable {
       // about performance, we'll skip those checks. Skipping them
       // should be legit, because we just sent a message so we know
       // the member is already in our view, etc.
-      ClusterDistributionManager dm = (ClusterDistributionManager) owner.getDM();
+      DistributionManager dm = owner.getDM();
       msg.setBytesRead(len);
       msg.setSender(remoteAddr);
       stats.incReceivedMessages(1L);
@@ -2854,39 +2661,36 @@ public class Connection implements Runnable {
       }
       try {
         requestClose(err + ": " + e);
-      } catch (Exception ex) {
+      } catch (Exception ignored) {
       }
-      throw new ConnectionException(
-          String.format("Unable to read direct ack because: %s", e));
+      throw new ConnectionException(String.format("Unable to read direct ack because: %s", e));
     } catch (ConnectionException e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
       throw e;
     } catch (Exception e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
       if (!isSocketClosed()) {
         logger.fatal("ack read exception", e);
       }
       try {
         requestClose(String.format("ack read exception: %s", e));
-      } catch (Exception ex) {
+      } catch (Exception ignored) {
       }
-      throw new ConnectionException(
-          String.format("Unable to read direct ack because: %s", e));
+      throw new ConnectionException(String.format("Unable to read direct ack because: %s", e));
     } finally {
       stats.incProcessedMessages(1L);
       accessed();
-      this.socketInUse = origSocketInUse;
-      if (this.ackTimedOut) {
-        logger.info("Finished waiting for reply from {}",
-            getRemoteAddress());
-        this.ackTimedOut = false;
+      socketInUse = origSocketInUse;
+      if (ackTimedOut) {
+        logger.info("Finished waiting for reply from {}", getRemoteAddress());
+        ackTimedOut = false;
       }
       if (msgReader != null) {
         msgReader.close();
       }
     }
     synchronized (stateLock) {
-      this.connectionState = STATE_RECEIVED_ACK;
+      connectionState = STATE_RECEIVED_ACK;
     }
   }
 
@@ -2895,7 +2699,6 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
-
     inputBuffer.flip();
 
     ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
@@ -2904,7 +2707,7 @@ public class Connection implements Runnable {
     boolean done = false;
 
     while (!done && connected) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
       int remaining = peerDataBuffer.remaining();
       if (lengthSet || remaining >= MSG_HEADER_BYTES) {
         if (!lengthSet) {
@@ -2922,7 +2725,7 @@ public class Connection implements Runnable {
           int oldLimit = peerDataBuffer.limit();
           peerDataBuffer.limit(startPos + messageLength);
 
-          if (this.handshakeRead) {
+          if (handshakeRead) {
             try {
               readMessage(peerDataBuffer);
             } catch (SerializationException e) {
@@ -2932,16 +2735,15 @@ public class Connection implements Runnable {
           } else {
             ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
             DataInputStream dis = new DataInputStream(bbis);
-            if (!this.isReceiver) {
+            if (!isReceiver) {
               // we read the handshake and then stop processing since we don't want
               // to process the input buffer anymore in a handshake thread
               readHandshakeForSender(dis, peerDataBuffer);
               return;
-            } else {
-              if (readHandshakeForReceiver(dis)) {
-                ioFilter.doneReading(peerDataBuffer);
-                return;
-              }
+            }
+            if (readHandshakeForReceiver(dis)) {
+              ioFilter.doneReading(peerDataBuffer);
+              return;
             }
           }
           if (!connected) {
@@ -2965,7 +2767,7 @@ public class Connection implements Runnable {
     }
   }
 
-  private boolean readHandshakeForReceiver(DataInputStream dis) {
+  private boolean readHandshakeForReceiver(DataInput dis) {
     try {
       byte b = dis.readByte();
       if (b != 0) {
@@ -2981,29 +2783,26 @@ public class Connection implements Runnable {
                 "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
                 HANDSHAKE_VERSION, handshakeByte));
       }
-      InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-      setRemoteAddr(remote);
-      this.sharedResource = dis.readBoolean();
-      this.preserveOrder = dis.readBoolean();
-      this.uniqueId = dis.readLong();
+      remoteAddr = DSFIDFactory.readInternalDistributedMember(dis);
+      sharedResource = dis.readBoolean();
+      preserveOrder = dis.readBoolean();
+      uniqueId = dis.readLong();
       // read the product version ordinal for on-the-fly serialization
       // transformations (for rolling upgrades)
-      this.remoteVersion = Version.readVersion(dis, true);
+      remoteVersion = Version.readVersion(dis, true);
       int dominoNumber = 0;
-      if (this.remoteVersion == null
-          || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
+      if (remoteVersion == null
+          || remoteVersion.compareTo(Version.GFE_80) >= 0) {
         dominoNumber = dis.readInt();
-        if (this.sharedResource) {
+        if (sharedResource) {
           dominoNumber = 0;
         }
         dominoCount.set(dominoNumber);
-        // this.senderName = dis.readUTF();
       }
-      if (!this.sharedResource) {
+      if (!sharedResource) {
         if (tipDomino()) {
-          logger.info(
-              "thread owned receiver forcing itself to send on thread owned sockets");
-          // bug #49565 - if domino count is >= 2 use shared resources.
+          logger.info("thread owned receiver forcing itself to send on thread owned sockets");
+          // if domino count is >= 2 use shared resources.
           // Also see DistributedCacheOperation#supportsDirectAck
         } else { // if (dominoNumber < 2) {
           ConnectionTable.threadWantsOwnResources();
@@ -3012,62 +2811,54 @@ public class Connection implements Runnable {
                 "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
                 dominoNumber);
           }
-          // } else {
-          // ConnectionTable.threadWantsSharedResources();
         }
-        this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
-        // Because this thread is not shared resource, it will be used for direct
-        // ack. Direct ack messages can be large. This call will resize the send
-        // buffer.
-        setSendBufferSize(this.socket);
+        conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
+        // Because this thread is not shared resource, it will be used for direct ack.
+        // Direct ack messages can be large. This call will resize the send buffer.
+        setSendBufferSize(socket);
       }
-      // String name = owner.getDM().getConfig().getName();
-      // if (name == null) {
-      // name = "pid="+OSProcess.getId();
-      // }
       setThreadName(dominoNumber);
     } catch (Exception e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
       logger.fatal("Error deserializing P2P handshake message", e);
-      this.readerShuttingDown = true;
+      readerShuttingDown = true;
       requestClose("Error deserializing P2P handshake message");
       return true;
     }
     if (logger.isDebugEnabled()) {
-      logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
-          (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+      logger.debug("P2P handshake remoteAddr is {}{}", remoteAddr,
+          remoteVersion != null ? " (" + remoteVersion + ')' : "");
     }
     try {
-      String authInit = System.getProperty(
-          DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
-      boolean isSecure = authInit != null && authInit.length() != 0;
+      String authInit = System.getProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
+      boolean isSecure = authInit != null && !authInit.isEmpty();
 
       if (isSecure) {
-        if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
-          sendOKHandshakeReply(); // fix for bug 33224
+        if (owner.getConduit().waitForMembershipCheck(remoteAddr)) {
+          sendOKHandshakeReply();
           notifyHandshakeWaiter(true);
         } else {
-          // ARB: check if we need notifyHandshakeWaiter() call.
+          // check if we need notifyHandshakeWaiter() call.
           notifyHandshakeWaiter(false);
           logger.warn("{} timed out during a membership check.",
               p2pReaderName());
           return true;
         }
       } else {
-        sendOKHandshakeReply(); // fix for bug 33224
+        sendOKHandshakeReply();
         try {
           notifyHandshakeWaiter(true);
         } catch (Exception e) {
           logger.fatal("Uncaught exception from listener", e);
         }
       }
-      this.finishedConnecting = true;
+      finishedConnecting = true;
     } catch (IOException ex) {
       final String err = "Failed sending handshake reply";
       if (logger.isDebugEnabled()) {
         logger.debug(err, ex);
       }
-      this.readerShuttingDown = true;
+      readerShuttingDown = true;
       requestClose(err + ": " + ex);
       return true;
     }
@@ -3084,13 +2875,13 @@ public class Connection implements Runnable {
     messageId = peerDataBuffer.getShort();
     directAck = (messageType & DIRECT_ACK_BIT) != 0;
     if (directAck) {
-      messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+      // clear the ack bit
+      messageType &= ~DIRECT_ACK_BIT;
     }
-    // Following validation fixes bug 31145
     if (!validMsgType(messageType)) {
       Integer nioMessageTypeInteger = (int) messageType;
       logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
-      this.readerShuttingDown = true;
+      readerShuttingDown = true;
       requestClose(String.format("Unknown P2P message type: %s",
           nioMessageTypeInteger));
       return true;
@@ -3104,16 +2895,16 @@ public class Connection implements Runnable {
 
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
-      this.owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
+      owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
       ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
               : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
-      DistributionMessage msg;
       try {
         ReplyProcessor21.initMessageRPId();
         // add serialization stats
-        long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
+        long startSer = owner.getConduit().getStats().startMsgDeserialization();
         int startingPosition = peerDataBuffer.position();
+        DistributionMessage msg;
         try {
           msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
         } catch (SerializationException e) {
@@ -3123,19 +2914,19 @@ public class Connection implements Runnable {
               peerDataBuffer.capacity(), messageLength);
           throw e;
         }
-        this.owner.getConduit().getStats().endMsgDeserialization(startSer);
+        owner.getConduit().getStats().endMsgDeserialization(startSer);
         if (bbis.available() != 0) {
-          logger.warn("Message deserialization of {} did not read {} bytes.",
-              msg, bbis.available());
+          logger.warn("Message deserialization of {} did not read {} bytes.", msg,
+              bbis.available());
         }
         try {
           if (!dispatchMessage(msg, messageLength, directAck)) {
             directAck = false;
           }
         } catch (MemberShunnedException e) {
-          directAck = false; // don't respond (bug39117)
+          directAck = false; // don't respond
         } catch (Exception de) {
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
           logger.fatal("Error dispatching message", de);
         } catch (ThreadDeath td) {
           throw td;
@@ -3166,8 +2957,7 @@ public class Connection implements Runnable {
         // error condition, so you also need to check to see if the JVM
         // is still usable:
         SystemFailure.checkFailure();
-        sendFailureReply(ReplyProcessor21.getMessageRPId(),
-            "Error deserializing message", t,
+        sendFailureReply(ReplyProcessor21.getMessageRPId(), "Error deserializing message", t,
             directAck);
         if (t instanceof ThreadDeath) {
           throw (ThreadDeath) t;
@@ -3184,15 +2974,16 @@ public class Connection implements Runnable {
       }
     } else if (messageType == CHUNKED_MSG_TYPE) {
       MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
-      this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+      owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
           messageLength);
       try {
         md.addChunk(peerDataBuffer, messageLength);
       } catch (IOException ex) {
+        // ignored
       }
     } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
       MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
-      this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+      owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
           messageLength);
       try {
         md.addChunk(peerDataBuffer, messageLength);
@@ -3208,20 +2999,20 @@ public class Connection implements Runnable {
       try {
         msg = md.getMessage();
       } catch (ClassNotFoundException ex) {
-        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
         failureMsg = "ClassNotFound deserializing message";
         failureEx = ex;
         rpId = md.getRPid();
         logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
       } catch (IOException ex) {
-        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
         failureMsg = "IOException deserializing message";
         failureEx = ex;
         rpId = md.getRPid();
         logger.fatal("IOException deserializing message", failureEx);
       } catch (InterruptedException ex) {
         interrupted = true;
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
       } catch (VirtualMachineError err) {
         SystemFailure.initiateFailure(err);
         // If this ever returns, rethrow the error. We're poisoned
@@ -3234,13 +3025,12 @@ public class Connection implements Runnable {
         // error condition, so you also need to check to see if the JVM
         // is still usable:
         SystemFailure.checkFailure();
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
         failureMsg = "Unexpected failure deserializing message";
         failureEx = ex;
         rpId = md.getRPid();
-        logger.fatal("Unexpected failure deserializing message",
-            failureEx);
+        logger.fatal("Unexpected failure deserializing message", failureEx);
       } finally {
         msgLength = md.size();
         releaseMsgDestreamer(messageId, md);
@@ -3257,7 +3047,7 @@ public class Connection implements Runnable {
           // not a member anymore - don't reply
           directAck = false;
         } catch (Exception de) {
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
           logger.fatal("Error dispatching message", de);
         } catch (ThreadDeath td) {
           throw td;
@@ -3283,49 +3073,47 @@ public class Connection implements Runnable {
 
   void readHandshakeForSender(DataInputStream dis, ByteBuffer peerDataBuffer) {
     try {
-      this.replyCode = dis.readUnsignedByte();
+      int replyCode = dis.readUnsignedByte();
       switch (replyCode) {
         case REPLY_CODE_OK:
           ioFilter.doneReading(peerDataBuffer);
           notifyHandshakeWaiter(true);
           return;
         case REPLY_CODE_OK_WITH_ASYNC_INFO:
-          this.asyncDistributionTimeout = dis.readInt();
-          this.asyncQueueTimeout = dis.readInt();
-          this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-          if (this.asyncDistributionTimeout != 0) {
+          asyncDistributionTimeout = dis.readInt();
+          asyncQueueTimeout = dis.readInt();
+          asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+          if (asyncDistributionTimeout != 0) {
             logger.info("{} async configuration received {}.",
                 p2pReaderName(),
-                " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                    + " asyncQueueTimeout=" + this.asyncQueueTimeout
+                " asyncDistributionTimeout=" + asyncDistributionTimeout
+                    + " asyncQueueTimeout=" + asyncQueueTimeout
                     + " asyncMaxQueueSize="
-                    + (this.asyncMaxQueueSize / (1024 * 1024)));
+                    + asyncMaxQueueSize / (1024 * 1024));
           }
           // read the product version ordinal for on-the-fly serialization
           // transformations (for rolling upgrades)
-          this.remoteVersion = Version.readVersion(dis, true);
+          remoteVersion = Version.readVersion(dis, true);
           ioFilter.doneReading(peerDataBuffer);
           notifyHandshakeWaiter(true);
           return;
         default:
           String err =
-              String.format("Unknown handshake reply code: %s messageLength: %s", this.replyCode,
-                  this.messageLength);
-          if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+              String.format("Unknown handshake reply code: %s messageLength: %s", replyCode,
+                  messageLength);
+          if (replyCode == 0 && logger.isDebugEnabled()) {
             logger.debug(err + " (peer probably departed ungracefully)");
           } else {
             logger.fatal(err);
           }
-          this.readerShuttingDown = true;
+          readerShuttingDown = true;
           requestClose(err);
-          return;
       }
     } catch (Exception e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
       logger.fatal("Error deserializing P2P handshake reply", e);
-      this.readerShuttingDown = true;
+      readerShuttingDown = true;
       requestClose("Error deserializing P2P handshake reply");
-      return;
     } catch (ThreadDeath td) {
       throw td;
     } catch (VirtualMachineError err) {
@@ -3340,24 +3128,21 @@ public class Connection implements Runnable {
       // error condition, so you also need to check to see if the JVM
       // is still usable:
       SystemFailure.checkFailure();
-      logger.fatal("Throwable deserializing P2P handshake reply",
-          t);
-      this.readerShuttingDown = true;
+      logger.fatal("Throwable deserializing P2P handshake reply", t);
+      readerShuttingDown = true;
       requestClose("Throwable deserializing P2P handshake reply");
-      return;
     }
   }
 
   private void setThreadName(int dominoNumber) {
-    Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + this.remoteAddr + " "
-        + (this.sharedResource ? "" : "un") + "shared" + " " + (this.preserveOrder ? "" : "un")
-        + "ordered" + " uid=" + this.uniqueId + (dominoNumber > 0 ? (" dom #" + dominoNumber) : "")
-        + " port=" + this.socket.getPort());
+    Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
+        + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
+        + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+        + " port=" + socket.getPort());
   }
 
   private void compactOrResizeBuffer(int messageLength) {
     final int oldBufferSize = inputBuffer.capacity();
-    final MembershipStatistics stats = this.owner.getConduit().getStats();
     int allocSize = messageLength + MSG_HEADER_BYTES;
     if (oldBufferSize < allocSize) {
       // need a bigger buffer
@@ -3390,7 +3175,7 @@ public class Connection implements Runnable {
             "We were asked to send a direct reply on a shared socket");
         msg.setReplySender(new DirectReplySender(this));
       }
-      this.owner.getConduit().messageReceived(this, msg, bytesRead);
+      owner.getConduit().messageReceived(this, msg, bytesRead);
       return true;
     } finally {
       if (msg.containsRegionContentChange()) {
@@ -3399,60 +3184,52 @@ public class Connection implements Runnable {
     }
   }
 
-  protected TCPConduit getConduit() {
-    return this.conduit;
+  TCPConduit getConduit() {
+    return conduit;
   }
 
   protected Socket getSocket() throws SocketException {
-    // fix for bug 37286
-    Socket result = this.socket;
+    Socket result = socket;
     if (result == null) {
-      throw new SocketException(
-          "socket has been closed");
+      throw new SocketException("socket has been closed");
     }
     return result;
   }
 
   boolean isSocketClosed() {
-    return this.socket.isClosed() || !this.socket.isConnected();
+    return socket.isClosed() || !socket.isConnected();
   }
 
   boolean isReceiverStopped() {
-    return this.stopped;
+    return stopped;
   }
 
   private boolean isSocketInUse() {
-    return this.socketInUse;
+    return socketInUse;
   }
 
-
   protected void accessed() {
-    this.accessed = true;
+    accessed = true;
   }
 
   /**
    * return the DM id of the member on the other side of this connection.
    */
   public InternalDistributedMember getRemoteAddress() {
-    return this.remoteAddr;
+    return remoteAddr;
   }
 
   /**
    * Return the version of the member on the other side of this connection.
    */
   Version getRemoteVersion() {
-    return this.remoteVersion;
+    return remoteVersion;
   }
 
   @Override
   public String toString() {
-    return String.valueOf(remoteAddr) + '@' + this.uniqueId
-        + (this.remoteVersion != null ? ('(' + this.remoteVersion.toString() + ')')
-            : "") /*
-                   * DEBUG + " accepted=" + this.isReceiver + " connected=" + this.connected +
-                   * " hash=" + System.identityHashCode(this) + " preserveOrder=" +
-                   * this.preserveOrder + " closing=" + isClosing() + ">"
-                   */;
+    return String.valueOf(remoteAddr) + '@' + uniqueId
+        + (remoteVersion != null ? '(' + remoteVersion.toString() + ')' : "");
   }
 
   /**
@@ -3462,7 +3239,7 @@ public class Connection implements Runnable {
    * @since GemFire 5.1
    */
   boolean getOriginatedHere() {
-    return !this.isReceiver;
+    return !isReceiver;
   }
 
   /**
@@ -3476,7 +3253,7 @@ public class Connection implements Runnable {
    * answers the unique ID of this connection in the originating VM
    */
   protected long getUniqueId() {
-    return this.uniqueId;
+    return uniqueId;
   }
 
   /**
@@ -3494,9 +3271,8 @@ public class Connection implements Runnable {
   }
 
   public void acquireSendPermission() throws ConnectionException {
-    if (!this.connected) {
-      throw new ConnectionException(
-          "connection is closed");
+    if (!connected) {
+      throw new ConnectionException("connection is closed");
     }
     if (isReaderThread()) {
       // reader threads send replies and we always want to permit those without waiting
@@ -3505,24 +3281,23 @@ public class Connection implements Runnable {
     boolean interrupted = false;
     try {
       for (;;) {
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
         try {
-          this.senderSem.acquire();
+          senderSem.acquire();
           break;
         } catch (InterruptedException ex) {
           interrupted = true;
         }
-      } // for
+      }
     } finally {
       if (interrupted) {
         Thread.currentThread().interrupt();
       }
     }
-    if (!this.connected) {
-      this.senderSem.release();
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
-      throw new ConnectionException(
-          "connection is closed");
+    if (!connected) {
+      senderSem.release();
+      owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+      throw new ConnectionException("connection is closed");
     }
   }
 
@@ -3530,7 +3305,7 @@ public class Connection implements Runnable {
     if (isReaderThread()) {
       return;
     }
-    this.senderSem.release();
+    senderSem.release();
   }
 
   private void closeSenderSem() {
@@ -3542,4 +3317,105 @@ public class Connection implements Runnable {
     releaseSendPermission();
   }
 
+  private class BatchBufferFlusher extends Thread {
+
+    private volatile boolean flushNeeded;
+    private volatile boolean timeToStop;
+    private final DMStats stats;
+
+    BatchBufferFlusher() {
+      setDaemon(true);
+      stats = owner.getConduit().getStats();
+    }
+
+    /**
+     * Called when a message writer needs the current fillBatchBuffer flushed
+     */
+    void flushBuffer(ByteBuffer bb) {
+      final long start = DistributionStats.getStatTime();
+      try {
+        synchronized (this) {
+          synchronized (batchLock) {
+            if (bb != fillBatchBuffer) {
+              // it must have already been flushed. So just return and use the new fillBatchBuffer
+              return;
+            }
+          }
+          flushNeeded = true;
+          notifyAll();
+        }
+        synchronized (batchLock) {
+          // Wait for the flusher thread
+          while (bb == fillBatchBuffer) {
+            owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+            boolean interrupted = Thread.interrupted();
+            try {
+              batchLock.wait(); // spurious wakeup ok
+            } catch (InterruptedException ex) {
+              interrupted = true;
+            } finally {
+              if (interrupted) {
+                Thread.currentThread().interrupt();
+              }
+            }
+          }
+        }
+      } finally {
+        owner.getConduit().getStats().incBatchWaitTime(start);
+      }
+    }
+
+    public void close() {
+      synchronized (this) {
+        timeToStop = true;
+        flushNeeded = true;
+        notifyAll();
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (this) {
+          while (!timeToStop) {
+            if (!flushNeeded && fillBatchBuffer.position() <= BATCH_BUFFER_SIZE / 2) {
+              wait(BATCH_FLUSH_MS); // spurious wakeup ok
+            }
+            if (flushNeeded || fillBatchBuffer.position() > BATCH_BUFFER_SIZE / 2) {
+              final long start = DistributionStats.getStatTime();
+              synchronized (batchLock) {
+                // This is the only block of code that will swap the buffer references
+                flushNeeded = false;
+                ByteBuffer tmp = fillBatchBuffer;
+                fillBatchBuffer = sendBatchBuffer;
+                sendBatchBuffer = tmp;
+                batchLock.notifyAll();
+              }
+              // We now own the sendBatchBuffer
+              if (sendBatchBuffer.position() > 0) {
+                final boolean origSocketInUse = socketInUse;
+                socketInUse = true;
+                try {
+                  sendBatchBuffer.flip();
+                  SocketChannel channel = getSocket().getChannel();
+                  writeFully(channel, sendBatchBuffer, false, null);
+                  sendBatchBuffer.clear();
+                } catch (IOException | ConnectionException ex) {
+                  logger.fatal("Exception flushing batch send buffer: %s", ex);
+                  readerShuttingDown = true;
+                  requestClose(String.format("Exception flushing batch send buffer: %s", ex));
+                } finally {
+                  accessed();
+                  socketInUse = origSocketInUse;
+                }
+              }
+              stats.incBatchFlushTime(start);
+            }
+          }
+        }
+      } catch (InterruptedException ex) {
+        // time for this thread to shutdown
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index a033d5f..309f9fd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -14,18 +14,19 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
@@ -39,11 +40,10 @@ import org.apache.geode.alerting.internal.spi.AlertingAction;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.internal.Distribution;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
 import org.apache.geode.distributed.internal.membership.gms.api.Membership;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.SystemTimer;
@@ -54,50 +54,45 @@ import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
- * <p>
  * ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe
  * between two endpoints represented by generic DistributedMembers.
- * </p>
  *
  * @since GemFire 2.1
  */
 public class ConnectionTable {
   private static final Logger logger = LogService.getLogger();
 
-  /** warning when descriptor limit reached */
+  /**
+   * warning when descriptor limit reached
+   */
   @MakeNotStatic
   private static boolean ulimitWarningIssued;
 
   /**
    * true if the current thread wants non-shared resources
    */
-  @MakeNotStatic
-  private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+  private static final ThreadLocal<Boolean> threadWantsOwnResources = new ThreadLocal<>();
 
   /**
    * Used for messages whose order must be preserved Only connections used for sending messages, and
    * receiving acks, will be put in this map.
    */
-  protected final Map orderedConnectionMap = new ConcurrentHashMap();
+  private final Map orderedConnectionMap = new ConcurrentHashMap();
 
   /**
    * ordered connections local to this thread. Note that accesses to the resulting map must be
    * synchronized because of static cleanup.
    */
-  // ThreadLocal<Map>
-  protected final ThreadLocal<Map> threadOrderedConnMap;
+  static final ThreadLocal<Map> threadOrderedConnMap = new ThreadLocal<>();
 
   /**
-   * List of thread-owned ordered connection maps, for cleanup
-   *
-   * Accesses to the maps in this list need to be synchronized on their instance.
+   * List of thread-owned ordered connection maps, for cleanup. Accesses to the maps in this list
+   * need to be synchronized on their instance.
    */
   private final List threadConnMaps;
 
   /**
-   * Timer to kill idle threads
-   *
-   * guarded.By this
+   * Timer to kill idle threads. Guarded by this.
    */
   private SystemTimer idleConnTimer;
 
@@ -112,13 +107,11 @@ public class ConnectionTable {
    * Used for all non-ordered messages. Only connections used for sending messages, and receiving
    * acks, will be put in this map.
    */
-  protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+  private final Map unorderedConnectionMap = new ConcurrentHashMap();
 
   /**
    * Used for all accepted connections. These connections are read only; we never send messages,
-   * except for acks; only receive.
-   *
-   * Consists of a list of Connection
+   * except for acks; only receive. Consists of a list of Connection.
    */
   private final List receivers = new ArrayList();
 
@@ -132,7 +125,7 @@ public class ConnectionTable {
   /**
    * true if this table is no longer in use
    */
-  private volatile boolean closed = false;
+  private volatile boolean closed;
 
   /**
    * Executor used by p2p reader and p2p handshaker threads.
@@ -143,13 +136,14 @@ public class ConnectionTable {
    * minutes).
    */
   private static final long READER_POOL_KEEP_ALIVE_TIME =
-      Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+      Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120);
 
   private final SocketCloser socketCloser;
 
   /**
    * The most recent instance to be created
    *
+   * <p>
    * TODO this assumes no more than one instance is created at a time?
    */
   @MakeNotStatic
@@ -158,7 +152,7 @@ public class ConnectionTable {
   /**
    * A set of sockets that are in the process of being connected
    */
-  private Map connectingSockets = new HashMap();
+  private final Map connectingSockets = new HashMap();
 
   /**
    * Cause calling thread to share communication resources with other threads.
@@ -178,7 +172,7 @@ public class ConnectionTable {
   /**
    * Returns true if calling thread owns its own communication resources.
    */
-  boolean threadOwnsResources() {
+  private boolean threadOwnsResources() {
     DistributionManager d = getDM();
     if (d != null) {
       return d.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting();
@@ -187,95 +181,87 @@ public class ConnectionTable {
   }
 
   public static Boolean getThreadOwnsResourcesRegistration() {
-    return (Boolean) threadWantsOwnResources.get();
+    return threadWantsOwnResources.get();
   }
 
   public TCPConduit getOwner() {
     return owner;
   }
 
+  public static ConnectionTable create(TCPConduit conduit) {
+    ConnectionTable ct = new ConnectionTable(conduit);
+    lastInstance.set(ct);
+    return ct;
+  }
 
-  private ConnectionTable(TCPConduit conduit) throws IOException {
-    this.owner = conduit;
-    this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
+  private ConnectionTable(TCPConduit conduit) {
+    owner = conduit;
+    idleConnTimer = owner.idleConnectionTimeout != 0
         ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
-    this.threadOrderedConnMap = new ThreadLocal();
-    this.threadConnMaps = new ArrayList();
-    this.threadConnectionMap = new ConcurrentHashMap();
-    this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
-    this.socketCloser = new SocketCloser();
-    this.bufferPool = new BufferPool(owner.getStats());
+    threadConnMaps = new ArrayList();
+    threadConnectionMap = new ConcurrentHashMap();
+    p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
+    socketCloser = new SocketCloser();
+    bufferPool = new BufferPool(owner.getStats());
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
     if (conserveSockets) {
       return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
-    } else {
-      return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1,
-          Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME);
     }
+    return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1,
+        Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME);
   }
 
   /** conduit calls acceptConnection after an accept */
-  protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
-      throws IOException, ConnectionException, InterruptedException {
-    InetAddress connAddress = sock.getInetAddress(); // for bug 44736
+  void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
+      throws IOException, ConnectionException {
+    InetAddress connAddress = sock.getInetAddress();
     boolean finishedConnecting = false;
     Connection connection = null;
-    // boolean exceptionLogged = false;
     try {
       connection = peerConnectionFactory.createReceiver(this, sock);
 
       // check for shutdown (so it doesn't get missed in the finally block)
-      this.owner.getCancelCriterion().checkCancelInProgress(null);
+      owner.getCancelCriterion().checkCancelInProgress(null);
       finishedConnecting = true;
-    } catch (IOException ex) {
-      // check for shutdown...
-      this.owner.getCancelCriterion().checkCancelInProgress(ex);
-      logger.warn(String.format("Failed to accept connection from %s because: %s",
-          new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
-      throw ex;
-    } catch (ConnectionException ex) {
+    } catch (ConnectionException | IOException ex) {
       // check for shutdown...
-      this.owner.getCancelCriterion().checkCancelInProgress(ex);
-      logger.warn(String.format("Failed to accept connection from %s because: %s",
-          new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
+      owner.getCancelCriterion().checkCancelInProgress(ex);
+      logger.warn("Failed to accept connection from {} because: {}",
+          connAddress != null ? connAddress : "unavailable address", ex);
       throw ex;
     } finally {
-      // note: no need to call incFailedAccept here because it will be done
-      // in our caller.
+      // note: no need to call incFailedAccept here because it will be done in our caller.
       // no need to log error here since caller will log warning
 
       if (connection != null && !finishedConnecting) {
         // we must be throwing from checkCancelInProgress so close the connection
-        closeCon("cancel after accept",
-            connection);
+        closeCon("cancel after accept", connection);
         connection = null;
       }
     }
 
     if (connection != null) {
-      synchronized (this.receivers) {
-        this.owner.getStats().incReceivers();
-        if (this.closed) {
+      synchronized (receivers) {
+        owner.getStats().incReceivers();
+        if (closed) {
           closeCon("Connection table no longer in use", connection);
           return;
         }
         // If connection.stopped is false, any connection cleanup thread will not yet have acquired
         // the receiver synchronization to remove the receiver. Therefore we can safely add it here.
         if (!(connection.isSocketClosed() || connection.isReceiverStopped())) {
-          this.receivers.add(connection);
+          receivers.add(connection);
         }
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(),
-            connection.remoteAddr);
+            connection.getRemoteAddress());
       }
     }
   }
 
-
-
   /**
    * Process a newly created PendingConnection
    *
@@ -298,11 +284,11 @@ public class ConnectionTable {
     try {
       con = Connection.createSender(owner.getMembership(), this, preserveOrder, id,
           sharedResource, startTime, ackThreshold, ackSAThreshold);
-      this.owner.getStats().incSenders(sharedResource, preserveOrder);
+      owner.getStats().incSenders(sharedResource, preserveOrder);
     } finally {
       // our connection failed to notify anyone waiting for our pending con
       if (con == null) {
-        this.owner.getStats().incFailedConnect();
+        owner.getStats().incFailedConnect();
         synchronized (m) {
           Object rmObj = m.remove(id);
           if (rmObj != pc && rmObj != null) {
@@ -313,10 +299,9 @@ public class ConnectionTable {
         pc.notifyWaiters(null);
         // we must be throwing an exception
       }
-    } // finally
+    }
 
-    // Update our list of connections -- either the
-    // orderedConnectionMap or unorderedConnectionMap
+    // Update our list of connections -- either the orderedConnectionMap or unorderedConnectionMap
     //
     // Note that we added the entry _before_ we attempted the connect,
     // so it's possible something else got through in the mean time...
@@ -325,30 +310,21 @@ public class ConnectionTable {
       if (e == pc) {
         m.put(id, con);
       } else if (e == null) {
-        // someone closed our pending connection
-        // so cleanup the connection we created
-        con.requestClose(
-            "pending connection cancelled");
+        // someone closed our pending connection so cleanup the connection we created
+        con.requestClose("pending connection cancelled");
         con = null;
       } else {
         if (e instanceof Connection) {
           Connection newCon = (Connection) e;
           if (!newCon.connected) {
-            // Fix for bug 31590
-            // someone closed our pending connect
-            // so cleanup the connection we created
+            // someone closed our pending connect so cleanup the connection we created
             if (con != null) {
-              con.requestClose(
-                  "pending connection closed");
+              con.requestClose("pending connection closed");
               con = null;
             }
           } else {
-            // This should not happen. It means that someone else
-            // created the connection which should only happen if
-            // our Connection was rejected.
-            // Assert.assertTrue(false);
-            // The above assertion was commented out to try the
-            // following with bug 32680
+            // This should not happen. It means that someone else created the connection which
+            // should only happen if our Connection was rejected.
             if (con != null) {
               con.requestClose("someone else created the connection");
             }
@@ -360,7 +336,7 @@ public class ConnectionTable {
     pc.notifyWaiters(con);
     if (con != null && logger.isDebugEnabled()) {
       logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con,
-          getConduit().getMemberId(), con.remoteAddr);
+          getConduit().getMemberId(), con.getRemoteAddress());
     }
 
     return con;
@@ -381,17 +357,19 @@ public class ConnectionTable {
   private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
       boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
       throws IOException, DistributedSystemDisconnectedException {
-    Connection result = null;
 
-    final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
+    final Map m = preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
 
-    PendingConnection pc = null; // new connection, if needed
-    Object mEntry = null; // existing connection (if we don't create a new one)
+    // new connection, if needed
+    PendingConnection pc = null;
+
+    // existing connection (if we don't create a new one)
+    Object mEntry;
 
     // Look for pending connection
     synchronized (m) {
       mEntry = m.get(id);
-      if (mEntry != null && (mEntry instanceof Connection)) {
+      if (mEntry instanceof Connection) {
         Connection existingCon = (Connection) mEntry;
         if (!existingCon.connected) {
           mEntry = null;
@@ -401,17 +379,19 @@ public class ConnectionTable {
         pc = new PendingConnection(preserveOrder, id);
         m.put(id, pc);
       }
-    } // synchronized
+    }
 
+    Connection result;
     if (pc != null) {
       if (logger.isDebugEnabled()) {
         logger.debug("created PendingConnection {}", pc);
       }
-      result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
+      result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
           startTime, ackTimeout, ackSATimeout);
       if (!preserveOrder && scheduleTimeout) {
         scheduleIdleTimeout(result);
       }
+
     } else { // we have existing connection
       if (mEntry instanceof PendingConnection) {
 
@@ -420,12 +400,12 @@ public class ConnectionTable {
           throw new IOException("Cannot form connection to alert listener " + id);
         }
 
-        result = ((PendingConnection) mEntry).waitForConnect(this.owner.getMembership(),
+        result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(),
             startTime, ackTimeout, ackSATimeout);
         if (logger.isDebugEnabled()) {
           if (result != null) {
             logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
-                getConduit().getMemberId(), result.remoteAddr);
+                getConduit().getMemberId(), result.getRemoteAddress());
           } else {
             logger.debug("getSharedConnection: Connect failed");
           }
@@ -448,31 +428,30 @@ public class ConnectionTable {
    * @return the connection, or null if an error
    * @throws IOException if the connection could not be created
    */
-  Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
+  private Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
       long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
     // Look for result in the thread local
-    Map m = (Map) this.threadOrderedConnMap.get();
+    Map m = threadOrderedConnMap.get();
     if (m == null) {
       // First time for this thread. Create thread local
       m = new HashMap();
-      synchronized (this.threadConnMaps) {
-        if (this.closed) {
+      synchronized (threadConnMaps) {
+        if (closed) {
           owner.getCancelCriterion().checkCancelInProgress(null);
-          throw new DistributedSystemDisconnectedException(
-              "Connection table is closed");
+          throw new DistributedSystemDisconnectedException("Connection table is closed");
         }
         // check for stale references and remove them.
-        for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
+        for (Iterator it = threadConnMaps.iterator(); it.hasNext();) {
           Reference r = (Reference) it.next();
           if (r.get() == null) {
             it.remove();
           }
-        } // for
-        this.threadConnMaps.add(new WeakReference(m)); // ref added for bug 38011
-      } // synchronized
-      this.threadOrderedConnMap.set(m);
+        }
+        threadConnMaps.add(new WeakReference(m));
+      }
+      threadOrderedConnMap.set(m);
     } else {
       // Consult thread local.
       synchronized (m) {
@@ -486,24 +465,22 @@ public class ConnectionTable {
       return result;
 
     // OK, we have to create a new connection.
-    result = Connection.createSender(owner.getMembership(), this, true /* preserveOrder */,
-        id, false /* shared */, startTime, ackTimeout, ackSATimeout);
+    result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime,
+        ackTimeout, ackSATimeout);
     if (logger.isDebugEnabled()) {
       logger.debug("ConnectionTable: created an ordered connection: {}", result);
     }
-    this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);
+    owner.getStats().incSenders(false, true);
 
     // Update the list of connections owned by this thread....
 
-    if (this.threadConnectionMap == null) {
+    if (threadConnectionMap == null) {
       // This instance is being destroyed; fail the operation
-      closeCon(
-          "Connection table being destroyed",
-          result);
+      closeCon("Connection table being destroyed", result);
       return null;
     }
 
-    ArrayList al = (ArrayList) this.threadConnectionMap.get(id);
+    ArrayList al = (ArrayList) threadConnectionMap.get(id);
     if (al == null) {
       // First connection for this DistributedMember. Make sure list for this
       // stub is created if it isn't already there.
@@ -511,7 +488,7 @@ public class ConnectionTable {
 
       // Since it's a concurrent map, we just try to put it and then
       // return whichever we got.
-      Object o = this.threadConnectionMap.putIfAbsent(id, al);
+      Object o = threadConnectionMap.putIfAbsent(id, al);
       if (o != null) {
         al = (ArrayList) o;
       }
@@ -534,37 +511,34 @@ public class ConnectionTable {
   /** schedule an idle-connection timeout task */
   private void scheduleIdleTimeout(Connection conn) {
     if (conn == null) {
-      // fix for bug 43529
       return;
     }
     // Set the idle timeout
-    if (this.owner.idleConnectionTimeout != 0) {
+    if (owner.idleConnectionTimeout != 0) {
       try {
         synchronized (this) {
-          if (!this.closed) {
+          if (!closed) {
             IdleConnTT task = new IdleConnTT(conn);
             conn.setIdleTimeoutTask(task);
-            this.getIdleConnTimer().scheduleAtFixedRate(task, this.owner.idleConnectionTimeout,
-                this.owner.idleConnectionTimeout);
+            getIdleConnTimer().scheduleAtFixedRate(task, owner.idleConnectionTimeout,
+                owner.idleConnectionTimeout);
           }
         }
       } catch (IllegalStateException e) {
         if (conn.isClosing()) {
-          // bug #45077 - connection is closed before we schedule the timeout task,
+          // connection is closed before we schedule the timeout task,
           // causing the task to be canceled
           return;
         }
         logger.debug("Got an illegal state exception: {}", e.getMessage(), e);
-        // Unfortunately, cancelInProgress() is not set until *after*
-        // the shutdown message has been sent, so we need to check the
-        // "closeInProgress" bit instead.
+        // Unfortunately, cancelInProgress() is not set until *after* the shutdown message has been
+        // sent, so we need to check the "closeInProgress" bit instead.
         owner.getCancelCriterion().checkCancelInProgress(null);
         Throwable cause = owner.getShutdownCause();
         if (cause == null) {
           cause = e;
         }
-        throw new DistributedSystemDisconnectedException(
-            "The distributed system is shutting down",
+        throw new DistributedSystemDisconnectedException("The distributed system is shutting down",
             cause);
       }
     }
@@ -579,17 +553,16 @@ public class ConnectionTable {
    * @param ackTimeout the ms ack-wait-threshold, or zero
    * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
    * @return the new Connection, or null if a problem
-   * @throws java.io.IOException if the connection could not be created
+   * @throws IOException if the connection could not be created
    */
   protected Connection get(DistributedMember id, boolean preserveOrder, long startTime,
       long ackTimeout, long ackSATimeout)
-      throws java.io.IOException, DistributedSystemDisconnectedException {
-    if (this.closed) {
-      this.owner.getCancelCriterion().checkCancelInProgress(null);
-      throw new DistributedSystemDisconnectedException(
-          "Connection table is closed");
+      throws IOException, DistributedSystemDisconnectedException {
+    if (closed) {
+      owner.getCancelCriterion().checkCancelInProgress(null);
+      throw new DistributedSystemDisconnectedException("Connection table is closed");
     }
-    Connection result = null;
+    Connection result;
     boolean threadOwnsResources = threadOwnsResources();
     if (!preserveOrder || !threadOwnsResources) {
       result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
@@ -603,26 +576,25 @@ public class ConnectionTable {
     return result;
   }
 
-  protected synchronized void fileDescriptorsExhausted() {
+  synchronized void fileDescriptorsExhausted() {
     if (!ulimitWarningIssued) {
       ulimitWarningIssued = true;
       logger.fatal(
           "This process is out of file descriptors.This will hamper communications and slow down the system.Any conserve-sockets setting is now being ignored.Please consider raising the descriptor limit.This alert is only issued once per process.");
       InternalDistributedSystem.getAnyInstance().setShareSockets(true);
-      threadWantsOwnResources = new ThreadLocal();
     }
   }
 
-  protected TCPConduit getConduit() {
+  TCPConduit getConduit() {
     return owner;
   }
 
-  public BufferPool getBufferPool() {
+  BufferPool getBufferPool() {
     return bufferPool;
   }
 
   public boolean isClosed() {
-    return this.closed;
+    return closed;
   }
 
   private static void closeCon(String reason, Object c) {
@@ -634,7 +606,7 @@ public class ConnectionTable {
       return;
     }
     if (c instanceof Connection) {
-      ((Connection) c).closePartialConnect(reason, beingSick); // fix for bug 31666
+      ((Connection) c).closePartialConnect(reason, beingSick);
     } else {
       ((PendingConnection) c).notifyWaiters(null);
     }
@@ -644,85 +616,76 @@ public class ConnectionTable {
    * returns the idle connection timer, or null if the connection table is closed. guarded by a sync
    * on the connection table
    */
-  protected synchronized SystemTimer getIdleConnTimer() {
-    if (this.closed) {
+  synchronized SystemTimer getIdleConnTimer() {
+    if (closed) {
       return null;
     }
-    if (this.idleConnTimer == null) {
-      this.idleConnTimer = new SystemTimer(getDM().getSystem(), true);
+    if (idleConnTimer == null) {
+      idleConnTimer = new SystemTimer(getDM().getSystem(), true);
     }
-    return this.idleConnTimer;
+    return idleConnTimer;
   }
 
   protected void close() {
-    /*
-     * NOMUX if (inputMuxManager != null) { inputMuxManager.stop(); }
-     */
-    if (this.closed) {
+    if (closed) {
       return;
     }
-    this.closed = true;
+    closed = true;
     synchronized (this) {
-      if (this.idleConnTimer != null) {
-        this.idleConnTimer.cancel();
+      if (idleConnTimer != null) {
+        idleConnTimer.cancel();
       }
     }
-    synchronized (this.orderedConnectionMap) {
-      for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext();) {
-        closeCon(
-            "Connection table being destroyed",
-            it.next());
+    synchronized (orderedConnectionMap) {
+      for (Object o : orderedConnectionMap.values()) {
+        closeCon("Connection table being destroyed", o);
       }
-      this.orderedConnectionMap.clear();
+      orderedConnectionMap.clear();
     }
-    synchronized (this.unorderedConnectionMap) {
-      for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext();) {
-        closeCon(
-            "Connection table being destroyed",
-            it.next());
+    synchronized (unorderedConnectionMap) {
+      for (Object o : unorderedConnectionMap.values()) {
+        closeCon("Connection table being destroyed", o);
       }
-      this.unorderedConnectionMap.clear();
+      unorderedConnectionMap.clear();
     }
-    if (this.threadConnectionMap != null) {
-      this.threadConnectionMap = null;
+    if (threadConnectionMap != null) {
+      threadConnectionMap = null;
     }
-    if (this.threadConnMaps != null) {
-      synchronized (this.threadConnMaps) {
-        for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
-          Reference r = (Reference) it.next();
-          Map m = (Map) r.get();
-          if (m != null) {
-            synchronized (m) {
-              for (Iterator mit = m.values().iterator(); mit.hasNext();) {
-                closeCon("Connection table being destroyed", mit.next());
+    if (threadConnMaps != null) {
+      synchronized (threadConnMaps) {
+        for (Object threadConnMap : threadConnMaps) {
+          Reference reference = (Reference) threadConnMap;
+          Map map = (Map) reference.get();
+          if (map != null) {
+            synchronized (map) {
+              for (Object o : map.values()) {
+                closeCon("Connection table being destroyed", o);
               }
             }
           }
         }
-        this.threadConnMaps.clear();
+        threadConnMaps.clear();
       }
     }
-    {
-      Executor localExec = this.p2pReaderThreadPool;
-      if (localExec != null) {
-        if (localExec instanceof ExecutorService) {
-          ((ExecutorService) localExec).shutdown();
-        }
+    Executor localExec = p2pReaderThreadPool;
+    if (localExec != null) {
+      if (localExec instanceof ExecutorService) {
+        ((ExecutorService) localExec).shutdown();
       }
     }
     closeReceivers(false);
 
-    Map m = (Map) this.threadOrderedConnMap.get();
-    if (m != null) {
-      synchronized (m) {
-        m.clear();
+    Map map = threadOrderedConnMap.get();
+    if (map != null) {
+      synchronized (map) {
+        map.clear();
       }
     }
-    this.socketCloser.close();
+    socketCloser.close();
   }
 
   public void executeCommand(Runnable runnable) {
-    Executor local = this.p2pReaderThreadPool;
+    Executor local = p2pReaderThreadPool;
     if (local != null) {
       local.execute(runnable);
     }
@@ -734,14 +697,12 @@ public class ConnectionTable {
    *
    * @param beingSick a test hook to simulate a sick process
    */
-  protected void closeReceivers(boolean beingSick) {
-    synchronized (this.receivers) {
-      for (Iterator it = this.receivers.iterator(); it.hasNext();) {
+  private void closeReceivers(boolean beingSick) {
+    synchronized (receivers) {
+      for (Iterator it = receivers.iterator(); it.hasNext();) {
         Connection con = (Connection) it.next();
         if (!beingSick || con.preserveOrder) {
-          closeCon(
-              "Connection table being destroyed",
-              con, beingSick);
+          closeCon("Connection table being destroyed", con, beingSick);
           it.remove();
         }
       }
@@ -749,7 +710,6 @@ public class ConnectionTable {
       synchronized (connectingSockets) {
         for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
           Map.Entry entry = (Map.Entry) it.next();
-          // ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
           try {
             ((Socket) entry.getKey()).close();
           } catch (IOException e) {
@@ -761,103 +721,90 @@ public class ConnectionTable {
     }
   }
 
-
-  protected void removeReceiver(Object con) {
-    synchronized (this.receivers) {
-      this.receivers.remove(con);
+  void removeReceiver(Object con) {
+    synchronized (receivers) {
+      receivers.remove(con);
     }
   }
 
   /**
-   * Return true if our owner already knows that this endpoint is departing
+   * remove an endpoint and notify the membership manager of the departure
    */
-  protected boolean isEndpointShuttingDown(DistributedMember id) {
-    return giveUpOnMember(owner.getDM().getDistribution(), id);
-  }
-
-  protected boolean giveUpOnMember(Distribution mgr, DistributedMember remoteAddr) {
-    return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
-  }
-
-  /** remove an endpoint and notify the membership manager of the departure */
-  protected void removeEndpoint(DistributedMember stub, String reason) {
+  void removeEndpoint(DistributedMember stub, String reason) {
     removeEndpoint(stub, reason, true);
   }
 
-  protected void removeEndpoint(DistributedMember memberID, String reason,
-      boolean notifyDisconnect) {
-    if (this.closed) {
+  void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) {
+    if (closed) {
       return;
     }
     boolean needsRemoval = false;
-    synchronized (this.orderedConnectionMap) {
-      if (this.orderedConnectionMap.get(memberID) != null)
+    synchronized (orderedConnectionMap) {
+      if (orderedConnectionMap.get(memberID) != null)
         needsRemoval = true;
     }
     if (!needsRemoval) {
-      synchronized (this.unorderedConnectionMap) {
-        if (this.unorderedConnectionMap.get(memberID) != null)
+      synchronized (unorderedConnectionMap) {
+        if (unorderedConnectionMap.get(memberID) != null)
           needsRemoval = true;
       }
     }
     if (!needsRemoval) {
-      ConcurrentMap cm = this.threadConnectionMap;
+      ConcurrentMap cm = threadConnectionMap;
       if (cm != null) {
-        ArrayList al = (ArrayList) cm.get(memberID);
-        needsRemoval = al != null && al.size() > 0;
+        List al = (ArrayList) cm.get(memberID);
+        needsRemoval = al != null && !al.isEmpty();
       }
     }
 
     if (needsRemoval) {
       InternalDistributedMember remoteAddress = null;
-      synchronized (this.orderedConnectionMap) {
-        Object c = this.orderedConnectionMap.remove(memberID);
+      synchronized (orderedConnectionMap) {
+        Object c = orderedConnectionMap.remove(memberID);
         if (c instanceof Connection) {
           remoteAddress = ((Connection) c).getRemoteAddress();
         }
         closeCon(reason, c);
       }
-      synchronized (this.unorderedConnectionMap) {
-        Object c = this.unorderedConnectionMap.remove(memberID);
-        if (remoteAddress == null && (c instanceof Connection)) {
+      synchronized (unorderedConnectionMap) {
+        Object c = unorderedConnectionMap.remove(memberID);
+        if (remoteAddress == null && c instanceof Connection) {
           remoteAddress = ((Connection) c).getRemoteAddress();
         }
         closeCon(reason, c);
       }
 
-      {
-        ConcurrentMap cm = this.threadConnectionMap;
-        if (cm != null) {
-          ArrayList al = (ArrayList) cm.remove(memberID);
-          if (al != null) {
-            synchronized (al) {
-              for (Iterator it = al.iterator(); it.hasNext();) {
-                Object c = it.next();
-                if (remoteAddress == null && (c instanceof Connection)) {
-                  remoteAddress = ((Connection) c).getRemoteAddress();
-                }
-                closeCon(reason, c);
+      ConcurrentMap cm = threadConnectionMap;
+      if (cm != null) {
+        List al = (ArrayList) cm.remove(memberID);
+        if (al != null) {
+          synchronized (al) {
+            for (Object c : al) {
+              if (remoteAddress == null && c instanceof Connection) {
+                remoteAddress = ((Connection) c).getRemoteAddress();
               }
-              al.clear();
+              closeCon(reason, c);
             }
+            al.clear();
           }
         }
       }
 
       // close any sockets that are in the process of being connected
-      Set toRemove = new HashSet();
+      Collection toRemove = new HashSet();
       synchronized (connectingSockets) {
         for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
           Map.Entry entry = (Map.Entry) it.next();
           ConnectingSocketInfo info = (ConnectingSocketInfo) entry.getValue();
-          if (info.peerAddress.equals(((InternalDistributedMember) memberID).getInetAddress())) {
+          if (info.peerAddress.equals(((MemberIdentifier) memberID).getInetAddress())) {
             toRemove.add(entry.getKey());
             it.remove();
           }
         }
       }
-      for (Iterator it = toRemove.iterator(); it.hasNext();) {
-        Socket sock = (Socket) it.next();
+
+      for (Object o : toRemove) {
+        Closeable sock = (Socket) o;
         try {
           sock.close();
         } catch (IOException e) {
@@ -869,10 +816,9 @@ public class ConnectionTable {
       }
 
       // close any receivers
-      // avoid deadlock when a NIC has failed by closing connections outside
-      // of the receivers sync (bug 38731)
+      // avoid deadlock when a NIC has failed by closing connections outside of the receivers sync
       toRemove.clear();
-      synchronized (this.receivers) {
+      synchronized (receivers) {
         for (Iterator it = receivers.iterator(); it.hasNext();) {
           Connection con = (Connection) it.next();
           if (memberID.equals(con.getRemoteAddress())) {
@@ -881,8 +827,8 @@ public class ConnectionTable {
           }
         }
       }
-      for (Iterator it = toRemove.iterator(); it.hasNext();) {
-        Connection con = (Connection) it.next();
+      for (Object o : toRemove) {
+        Connection con = (Connection) o;
         closeCon(reason, con);
       }
       if (notifyDisconnect) {
@@ -893,20 +839,20 @@ public class ConnectionTable {
       }
 
       if (remoteAddress != null) {
-        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
+        socketCloser.releaseResourcesForAddress(remoteAddress.toString());
       }
     }
   }
 
   SocketCloser getSocketCloser() {
-    return this.socketCloser;
+    return socketCloser;
   }
 
   /** check to see if there are still any receiver threads for the given end-point */
-  protected boolean hasReceiversFor(DistributedMember endPoint) {
-    synchronized (this.receivers) {
-      for (Iterator it = receivers.iterator(); it.hasNext();) {
-        Connection con = (Connection) it.next();
+  boolean hasReceiversFor(DistributedMember endPoint) {
+    synchronized (receivers) {
+      for (Object receiver : receivers) {
+        Connection con = (Connection) receiver;
         if (endPoint.equals(con.getRemoteAddress())) {
           return true;
         }
@@ -918,7 +864,7 @@ public class ConnectionTable {
   private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
       Connection c) {
     if (cm != null) {
-      ArrayList al = (ArrayList) cm.get(stub);
+      List al = (ArrayList) cm.get(stub);
       if (al != null) {
         synchronized (al) {
           al.remove(c);
@@ -927,37 +873,34 @@ public class ConnectionTable {
     }
   }
 
-  protected void removeThreadConnection(DistributedMember stub, Connection c) {
-    /*
-     * if (this.closed) { return; }
-     */
-    removeFromThreadConMap(this.threadConnectionMap, stub, c);
-    Map m = (Map) this.threadOrderedConnMap.get();
+  void removeThreadConnection(DistributedMember stub, Connection c) {
+    removeFromThreadConMap(threadConnectionMap, stub, c);
+    Map m = threadOrderedConnMap.get();
     if (m != null) {
       // Static cleanup thread might intervene, so we MUST synchronize
       synchronized (m) {
         if (m.get(stub) == c) {
           m.remove(stub);
         }
-      } // synchronized
-    } // m != null
+      }
+    }
   }
 
   void removeSharedConnection(String reason, DistributedMember stub, boolean ordered,
       Connection c) {
-    if (this.closed) {
+    if (closed) {
       return;
     }
     if (ordered) {
-      synchronized (this.orderedConnectionMap) {
-        if (this.orderedConnectionMap.get(stub) == c) {
-          closeCon(reason, this.orderedConnectionMap.remove(stub));
+      synchronized (orderedConnectionMap) {
+        if (orderedConnectionMap.get(stub) == c) {
+          closeCon(reason, orderedConnectionMap.remove(stub));
         }
       }
     } else {
-      synchronized (this.unorderedConnectionMap) {
-        if (this.unorderedConnectionMap.get(stub) == c) {
-          closeCon(reason, this.unorderedConnectionMap.remove(stub));
+      synchronized (unorderedConnectionMap) {
+        if (unorderedConnectionMap.get(stub) == c) {
+          closeCon(reason, unorderedConnectionMap.remove(stub));
         }
       }
     }
@@ -986,8 +929,8 @@ public class ConnectionTable {
     lastInstance.set(null);
   }
 
-  public void removeAndCloseThreadOwnedSockets() {
-    Map m = (Map) this.threadOrderedConnMap.get();
+  void removeAndCloseThreadOwnedSockets() {
+    Map m = threadOrderedConnMap.get();
     if (m != null) {
       // Static cleanup may intervene; we MUST synchronize.
       synchronized (m) {
@@ -996,11 +939,11 @@ public class ConnectionTable {
           Map.Entry me = (Map.Entry) it.next();
           DistributedMember stub = (DistributedMember) me.getKey();
           Connection c = (Connection) me.getValue();
-          removeFromThreadConMap(this.threadConnectionMap, stub, c);
+          removeFromThreadConMap(threadConnectionMap, stub, c);
           it.remove();
           closeCon("thread finalization", c);
-        } // while
-      } // synchronized m
+        }
+      }
     }
   }
 
@@ -1010,7 +953,6 @@ public class ConnectionTable {
       return;
     }
     ct.removeAndCloseThreadOwnedSockets();
-    // lastInstance = null;
   }
 
   /**
@@ -1019,9 +961,8 @@ public class ConnectionTable {
    *
    * @since GemFire 5.1
    */
-  protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
-
-    ConcurrentMap cm = this.threadConnectionMap;
+  void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
+    ConcurrentMap cm = threadConnectionMap;
     if (cm != null) {
       ArrayList al = (ArrayList) cm.get(member);
       if (al != null) {
@@ -1029,10 +970,10 @@ public class ConnectionTable {
           al = new ArrayList(al);
         }
 
-        for (Iterator it = al.iterator(); it.hasNext();) {
-          Connection conn = (Connection) it.next();
+        for (Object o : al) {
+          Connection conn = (Connection) o;
           if (!conn.isSharedResource() && conn.getOriginatedHere() && conn.getPreserveOrder()) {
-            result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent()));
+            result.put(conn.getUniqueId(), conn.getMessagesSent());
           }
         }
       }
@@ -1042,21 +983,23 @@ public class ConnectionTable {
   /**
    * wait for the given incoming connections to receive at least the associated number of messages
    */
-  protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
-      Map connectionStates) throws InterruptedException {
-    if (Thread.interrupted())
-      throw new InterruptedException(); // wisest to do this before the synchronize below
-    List r = null;
+  void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map connectionStates)
+      throws InterruptedException {
+    if (Thread.interrupted()) {
+      // wisest to do this before the synchronize below
+      throw new InterruptedException();
+    }
+    List r;
     synchronized (receivers) {
       r = new ArrayList(receivers);
     }
-    for (Iterator it = r.iterator(); it.hasNext();) {
-      Connection con = (Connection) it.next();
+    for (Object o : r) {
+      Connection con = (Connection) o;
       if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
           && member.equals(con.getRemoteAddress())) {
-        Long state = (Long) connectionStates.remove(Long.valueOf(con.getUniqueId()));
+        Long state = (Long) connectionStates.remove(con.getUniqueId());
         if (state != null) {
-          long count = state.longValue();
+          long count = state;
           while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
             if (logger.isDebugEnabled()) {
               logger.debug("Waiting for connection {}/{} currently={} need={}",
@@ -1067,7 +1010,7 @@ public class ConnectionTable {
         }
       }
     }
-    if (connectionStates.size() > 0) {
+    if (!connectionStates.isEmpty()) {
       if (logger.isDebugEnabled()) {
         StringBuffer sb = new StringBuffer(1000);
         sb.append("These connections from ");
@@ -1086,41 +1029,29 @@ public class ConnectionTable {
   }
 
   protected DistributionManager getDM() {
-    return this.owner.getDM();
+    return owner.getDM();
   }
 
-  // public boolean isShuttingDown() {
-  // return this.owner.isShuttingDown();
-  // }
-
-  // protected void cleanupHighWater() {
-  // cleanup(highWater);
-  // }
-
-  // protected void cleanupLowWater() {
-  // cleanup(lowWater);
-  // }
-
-  // private void cleanup(int maxConnections) {
-  /*
-   * if (maxConnections == 0 || maxConnections >= connections.size()) { return; } while
-   * (connections.size() > maxConnections) { Connection oldest = null; synchronized(connections) {
-   * for (Iterator iter = connections.values().iterator(); iter.hasNext(); ) { Connection c =
-   * (Connection)iter.next(); if (oldest == null || c.getTimeStamp() < oldest.getTimeStamp()) {
-   * oldest = c; } } } // sanity check - don't close anything fresher than 10 seconds or // we'll
-   * start thrashing if (oldest.getTimeStamp() > (System.currentTimeMillis() - 10000)) { if
-   * (owner.lowWaterConnectionCount > 0) { owner.lowWaterConnectionCount += 10; } if
-   * (owner.highWaterConnectionCount > 0) { owner.highWaterConnectionCount += 10; } new Object[] {
-   * owner.lowWaterConnectionCount, owner.highWaterConnectionCount }); break; } if (oldest != null)
-   * { oldest.close(); } }
-   */
-  // }
+  /** keep track of a socket that is trying to connect() for shutdown purposes */
+  void addConnectingSocket(Socket socket, InetAddress addr) {
+    synchronized (connectingSockets) {
+      connectingSockets.put(socket, new ConnectingSocketInfo(addr));
+    }
+  }
+
+  /** remove a socket from the tracked set. It should be connected at this point */
+  void removeConnectingSocket(Socket socket) {
+    synchronized (connectingSockets) {
+      connectingSockets.remove(socket);
+    }
+  }
+
+  int getNumberOfReceivers() {
+    return receivers.size();
+  }
+
+  private class PendingConnection {
 
-  /*
-   * public void dumpConnectionTable() { Iterator iter = connectionMap.keySet().iterator(); while
-   * (iter.hasNext()) { Object key = iter.next(); Object val = connectionMap.get(key); } }
-   */
-  private /* static */ class PendingConnection {
     /**
      * true if this connection is still pending
      */
@@ -1129,7 +1060,7 @@ public class ConnectionTable {
     /**
      * the connection we are waiting on
      */
-    private Connection conn = null;
+    private Connection conn;
 
     /**
      * whether the connection preserves message ordering
@@ -1143,10 +1074,10 @@ public class ConnectionTable {
 
     private final Thread connectingThread;
 
-    public PendingConnection(boolean preserveOrder, DistributedMember id) {
+    private PendingConnection(boolean preserveOrder, DistributedMember id) {
       this.preserveOrder = preserveOrder;
       this.id = id;
-      this.connectingThread = Thread.currentThread();
+      connectingThread = Thread.currentThread();
     }
 
     /**
@@ -1154,17 +1085,18 @@ public class ConnectionTable {
      *
      * @param c the new connection
      */
-    public synchronized void notifyWaiters(Connection c) {
-      if (!this.pending)
-        return; // already done.
+    private synchronized void notifyWaiters(Connection c) {
+      if (!pending) {
+        return;
+      }
 
-      this.conn = c;
-      this.pending = false;
+      conn = c;
+      pending = false;
       if (logger.isDebugEnabled()) {
         logger.debug("Notifying waiters that pending {} connection to {} is ready; {}",
-            ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
+            preserveOrder ? "ordered" : "unordered", id, this);
       }
-      this.notifyAll();
+      notifyAll();
     }
 
     /**
@@ -1176,24 +1108,23 @@ public class ConnectionTable {
      * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
      * @return the new connection
      */
-    public synchronized Connection waitForConnect(Membership mgr, long startTime,
-        long ackTimeout, long ackSATimeout) throws IOException {
+    private synchronized Connection waitForConnect(Membership mgr, long startTime, long ackTimeout,
+        long ackSATimeout) {
       if (connectingThread == Thread.currentThread()) {
         throw new ReenteredConnectException("This thread is already trying to connect");
       }
 
-      final Map m = this.preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
+      final Map m = preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
 
-      boolean severeAlertIssued = false;
-      boolean suspected = false;
       DistributedMember targetMember = null;
       if (ackSATimeout > 0) {
-        targetMember = this.id;
+        targetMember = id;
       }
 
-      int attempt = 0;
-      for (;;) {
-        if (!this.pending) {
+      boolean suspected = false;
+      boolean severeAlertIssued = false;
+      for (int attempt = 0;;) {
+        if (!pending) {
           break;
         }
         getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -1201,7 +1132,7 @@ public class ConnectionTable {
         // wait a little bit...
         boolean interrupted = Thread.interrupted();
         try {
-          this.wait(100); // spurious wakeup ok
+          wait(100);
         } catch (InterruptedException ignore) {
           interrupted = true;
           getConduit().getCancelCriterion().checkCancelInProgress(ignore);
@@ -1211,8 +1142,9 @@ public class ConnectionTable {
           }
         }
 
-        if (!this.pending)
+        if (!pending) {
           break;
+        }
 
         // Still pending...
         long now = System.currentTimeMillis();
@@ -1225,22 +1157,19 @@ public class ConnectionTable {
             severeAlertIssued = true;
           } else if (!suspected) {
             logger.warn("Unable to form a TCP/IP connection to %s in over %s seconds",
-                this.id, (ackTimeout) / 1000);
-            ((GMSMembership) mgr).suspectMember((InternalDistributedMember) targetMember,
+                id, ackTimeout / 1000);
+            mgr.suspectMember(targetMember,
                 "Unable to form a TCP/IP connection in a reasonable amount of time");
             suspected = true;
           }
         }
 
-        Object e;
-        // synchronized (m) {
-        e = m.get(this.id);
-        // }
+        Object e = m.get(id);
         if (e == this) {
           attempt += 1;
-          if (logger.isDebugEnabled() && (attempt % 20 == 1)) {
+          if (logger.isDebugEnabled() && attempt % 20 == 1) {
             logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
-                ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
+                preserveOrder ? "ordered" : "unordered", id, this);
           }
           continue;
         }
@@ -1254,17 +1183,16 @@ public class ConnectionTable {
           // We were removed
           notifyWaiters(null);
           break;
-        } else if (e instanceof Connection) {
+        }
+        if (e instanceof Connection) {
           notifyWaiters((Connection) e);
           break;
-        } else {
-          // defer to the new instance
-          return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
         }
+        // defer to the new instance
+        return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
 
-      } // for
-      return this.conn;
-
+      }
+      return conn;
     }
 
     public String toString() {
@@ -1272,28 +1200,27 @@ public class ConnectionTable {
     }
   }
 
-
   private static class IdleConnTT extends SystemTimer.SystemTimerTask {
 
     private Connection c;
 
-    IdleConnTT(Connection c) {
+    private IdleConnTT(Connection c) {
       this.c = c;
     }
 
     @Override
     public boolean cancel() {
-      Connection con = this.c;
+      Connection con = c;
       if (con != null) {
         con.cleanUpOnIdleTaskCancel();
       }
-      this.c = null;
+      c = null;
       return super.cancel();
     }
 
     @Override
     public void run2() {
-      Connection con = this.c;
+      Connection con = c;
       if (con != null) {
         if (con.checkForIdleTimeout()) {
           cancel();
@@ -1302,38 +1229,12 @@ public class ConnectionTable {
     }
   }
 
-  public static ConnectionTable create(TCPConduit conduit) throws IOException {
-    ConnectionTable ct = new ConnectionTable(conduit);
-    lastInstance.set(ct);
-    return ct;
-  }
-
-  /** keep track of a socket that is trying to connect() for shutdown purposes */
-  public void addConnectingSocket(Socket socket, InetAddress addr) {
-    synchronized (connectingSockets) {
-      connectingSockets.put(socket, new ConnectingSocketInfo(addr));
-    }
-  }
-
-  /** remove a socket from the tracked set. It should be connected at this point */
-  public void removeConnectingSocket(Socket socket) {
-    synchronized (connectingSockets) {
-      connectingSockets.remove(socket);
-    }
-  }
-
-
   private static class ConnectingSocketInfo {
-    InetAddress peerAddress;
-    Thread connectingThread;
 
-    public ConnectingSocketInfo(InetAddress addr) {
-      this.peerAddress = addr;
-      this.connectingThread = Thread.currentThread();
-    }
-  }
+    private final InetAddress peerAddress;
 
-  public int getNumberOfReceivers() {
-    return receivers.size();
+    private ConnectingSocketInfo(InetAddress addr) {
+      peerAddress = addr;
+    }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 365bf35..de36b1a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -14,12 +14,17 @@
  */
 package org.apache.geode.internal.tcp;
 
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
@@ -56,26 +61,21 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  * <p>
  * TCPConduit manages a server socket and a collection of connections to other systems. Connections
  * are identified by DistributedMember IDs. These types of messages are currently supported:
- * </p>
  *
- * <pre>
- * <p>
- * DistributionMessage - message is delivered to the server's
- * ServerDelegate
  * <p>
- * </pre>
+ * DistributionMessage - message is delivered to the server's ServerDelegate
+ *
  * <p>
  * In the current implementation, ServerDelegate is the DirectChannel used by the GemFire
  * DistributionManager to send and receive messages.
+ *
  * <p>
  * If the ServerDelegate is null, DistributionMessages are ignored by the TCPConduit.
- * </p>
  *
  * @since GemFire 2.0
  */
 
 public class TCPConduit implements Runnable {
-
   private static final Logger logger = LogService.getLogger();
 
   /**
@@ -93,6 +93,7 @@ public class TCPConduit implements Runnable {
    * enough. Setting it too low can also have adverse effects because backlog overflows
    * aren't handled well by most tcp/ip implementations, causing connect timeouts instead
    * of expected ServerRefusedConnection exceptions.
+   *
    * <p>
    * Normally the backlog isn't that important because if it's full of connection requests
    * a SYN "cookie" mechanism is used to bypass the backlog queue. If this is turned off
@@ -104,45 +105,24 @@ public class TCPConduit implements Runnable {
   /**
    * use javax.net.ssl.SSLServerSocketFactory?
    */
-  boolean useSSL;
+  private final boolean useSSL;
 
   /**
    * The socket producer used by the cluster
    */
   private final SocketCreator socketCreator;
 
-
-  private Membership membership;
+  private final Membership membership;
 
   static {
     init();
   }
 
-  public Membership getMembership() {
-    return membership;
-  }
-
-  public static int getBackLog() {
-    return BACKLOG;
-  }
-
-  public static void init() {
-    // only use direct buffers if we are using nio
-    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
-    // note: bug 37730 concerned this defaulting to 50
-    BACKLOG = Integer.getInteger("p2p.backlog", 1280);
-    if (Boolean.getBoolean("p2p.oldIO")) {
-      logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
-    }
-  }
-
-  ///////////////// permanent conduit state
-
   /**
    * the size of OS TCP/IP buffers, not set by default
    */
-  int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
-  int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+  int tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
+  int idleConnectionTimeout = DEFAULT_SOCKET_LEASE_TIME;
 
   /**
    * port is the tcp/ip port that this conduit binds to. If it is zero, a port from
@@ -151,8 +131,8 @@ public class TCPConduit implements Runnable {
    */
   private int port;
 
-  private int[] tcpPortRange = new int[] {DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[0],
-      DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
+  private final int[] tcpPortRange =
+      new int[] {DEFAULT_MEMBERSHIP_PORT_RANGE[0], DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
 
   /**
    * The java groups address that this conduit is associated with
@@ -183,14 +163,12 @@ public class TCPConduit implements Runnable {
    */
   private DistributionConfig config;
 
-  ////////////////// runtime state that is re-initialized on a restart
-
   /**
    * server socket address
    */
   private InetSocketAddress id;
 
-  protected volatile boolean stopped;
+  private volatile boolean stopped;
 
   /**
    * the listener thread
@@ -213,14 +191,20 @@ public class TCPConduit implements Runnable {
   private ConnectionTable conTable;
 
   /**
+   * the reason for a shutdown, if abnormal
+   */
+  private volatile Exception shutdownCause;
+
+  private final Stopper stopper = new Stopper();
+
+  /**
    * <p>
    * creates a new TCPConduit bound to the given InetAddress and port. The given ServerDelegate will
    * receive any DistributionMessages passed to the conduit.
-   * </p>
+   *
    * <p>
    * This constructor forces the conduit to ignore the following system properties and look for them
    * only in the <i>props</i> argument:
-   * </p>
    *
    * <pre>
    * p2p.tcpBufferSize
@@ -234,35 +218,28 @@ public class TCPConduit implements Runnable {
     this.address = address;
     this.isBindAddress = isBindAddress;
     this.port = port;
-    this.directChannel = receiver;
-    this.stats = null;
-    this.config = null;
-    this.membership = mgr;
+    directChannel = receiver;
+    stats = null;
+    config = null;
+    membership = mgr;
     if (directChannel != null) {
-      this.stats = directChannel.getDMStats();
-      this.config = directChannel.getDMConfig();
+      stats = directChannel.getDMStats();
+      config = directChannel.getDMConfig();
     }
-    if (this.getStats() == null) {
-      this.stats = new LonerDistributionManager.DummyDMStats();
+    if (getStats() == null) {
+      stats = new LonerDistributionManager.DummyDMStats();
     }
 
-    try {
-      this.conTable = ConnectionTable.create(this);
-    } catch (IOException io) {
-      throw new ConnectionException(
-          "Unable to initialize connection table",
-          io);
-    }
+    conTable = ConnectionTable.create(this);
 
-    this.socketCreator =
+    socketCreator =
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
-    this.useSSL = socketCreator.useSSL();
+    useSSL = socketCreator.useSSL();
 
-    InetAddress addr = address;
-    if (addr == null) {
+    if (address == null) {
       try {
-        addr = SocketCreator.getLocalHost();
-      } catch (java.net.UnknownHostException e) {
+        SocketCreator.getLocalHost();
+      } catch (UnknownHostException e) {
         throw new ConnectionException("Unable to resolve localHost address", e);
       }
     }
@@ -270,25 +247,39 @@ public class TCPConduit implements Runnable {
     startAcceptor();
   }
 
+  public static void init() {
+    // only use direct buffers if we are using nio
+    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
+    BACKLOG = Integer.getInteger("p2p.backlog", 1280);
+    if (Boolean.getBoolean("p2p.oldIO")) {
+      logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
+    }
+  }
+
+  public Membership getMembership() {
+    return membership;
+  }
+
+  public static int getBackLog() {
+    return BACKLOG;
+  }
 
   /**
    * parse instance-level properties from the given object
    */
   private void parseProperties(Properties p) {
     if (p != null) {
-      String s;
-      s = p.getProperty("p2p.tcpBufferSize", "" + tcpBufferSize);
+      String s = p.getProperty("p2p.tcpBufferSize", String.valueOf(tcpBufferSize));
       try {
         tcpBufferSize = Integer.parseInt(s);
       } catch (Exception e) {
-        logger.warn("exception parsing p2p.tcpBufferSize",
-            e);
+        logger.warn("exception parsing p2p.tcpBufferSize", e);
       }
       if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
         // enforce minimum
         tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
       }
-      s = p.getProperty("p2p.idleConnectionTimeout", "" + idleConnectionTimeout);
+      s = p.getProperty("p2p.idleConnectionTimeout", String.valueOf(idleConnectionTimeout));
       try {
         idleConnectionTimeout = Integer.parseInt(s);
       } catch (Exception e) {
@@ -306,27 +297,20 @@ public class TCPConduit implements Runnable {
       try {
         tcpPortRange[1] = Integer.parseInt(s);
       } catch (Exception e) {
-        logger.warn("Exception parsing membership-port-range end port.",
-            e);
+        logger.warn("Exception parsing membership-port-range end port.", e);
       }
-
     }
   }
 
   /**
-   * the reason for a shutdown, if abnormal
-   */
-  private volatile Exception shutdownCause;
-
-  /**
    * binds the server socket and gets threads going
    */
   private void startAcceptor() throws ConnectionException {
-    int localPort;
-    int p = this.port;
+    int p = port;
 
     createServerSocket();
 
+    int localPort;
     try {
       localPort = socket.getLocalPort();
 
@@ -349,7 +333,7 @@ public class TCPConduit implements Runnable {
       String s = "While creating ServerSocket on port " + p;
       throw new ConnectionException(s, io);
     }
-    this.port = localPort;
+    port = localPort;
   }
 
   /**
@@ -357,16 +341,15 @@ public class TCPConduit implements Runnable {
    * this.bindAddress, which must be set before invoking this method.
    */
   private void createServerSocket() {
-    int serverPort = this.port;
+    int serverPort = port;
     int connectionRequestBacklog = BACKLOG;
-    InetAddress bindAddress = this.address;
+    InetAddress bindAddress = address;
 
     try {
       if (serverPort <= 0) {
-
         socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
-            connectionRequestBacklog, isBindAddress,
-            true, 0, tcpPortRange);
+            connectionRequestBacklog, isBindAddress, true, 0, tcpPortRange);
+
       } else {
         ServerSocketChannel channel = ServerSocketChannel.open();
         socket = channel.socket();
@@ -383,8 +366,7 @@ public class TCPConduit implements Runnable {
         int newSize = socket.getReceiveBufferSize();
         if (newSize != tcpBufferSize) {
           logger.info("{} is {} instead of the requested {}",
-              "Listener receiverBufferSize", newSize,
-              tcpBufferSize);
+              "Listener receiverBufferSize", newSize, tcpBufferSize);
         }
       } catch (SocketException ex) {
         logger.warn("Failed to set listener receiverBufferSize to {}",
@@ -444,7 +426,9 @@ public class TCPConduit implements Runnable {
     conTable = null;
   }
 
-  /* stops the conduit, closing all tcp/ip connections */
+  /**
+   * stops the conduit, closing all tcp/ip connections
+   */
   public void stop(Exception cause) {
     if (!stopped) {
       stopped = true;
@@ -454,16 +438,15 @@ public class TCPConduit implements Runnable {
         logger.trace(LogMarker.DM_VERBOSE, "Shutting down conduit");
       }
       try {
-        // set timeout endpoint here since interrupt() has been known
-        // to hang
+        // set timeout endpoint here since interrupt() has been known to hang
         long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
-        Thread t = this.thread;
+        Thread t = thread;
         if (channel != null) {
           channel.close();
-          // NOTE: do not try to interrupt the listener thread at this point.
-          // Doing so interferes with the channel's socket logic.
+          // NOTE: do not try to interrupt the listener thread at this point;
+          // doing so interferes with the channel's socket logic.
         } else {
-          ServerSocket s = this.socket;
+          ServerSocket s = socket;
           if (s != null) {
             s.close();
           }
@@ -473,7 +456,7 @@ public class TCPConduit implements Runnable {
         }
 
         do {
-          t = this.thread;
+          t = thread;
           if (t == null || !t.isAlive()) {
             break;
           }
@@ -490,7 +473,7 @@ public class TCPConduit implements Runnable {
       }
 
       // close connections after shutting down acceptor to fix bug 30695
-      this.conTable.close();
+      conTable.close();
 
       socket = null;
       thread = null;
@@ -499,15 +482,6 @@ public class TCPConduit implements Runnable {
   }
 
   /**
-   * Returns whether or not this conduit is stopped
-   *
-   * @since GemFire 3.0
-   */
-  public boolean isStopped() {
-    return this.stopped;
-  }
-
-  /**
    * starts the conduit again after it's been stopped. This will clear the server map if the
    * conduit's port is zero (wildcard bind)
    */
@@ -515,20 +489,14 @@ public class TCPConduit implements Runnable {
     if (!stopped) {
       return;
     }
-    this.stats = null;
+    stats = null;
     if (directChannel != null) {
-      this.stats = directChannel.getDMStats();
+      stats = directChannel.getDMStats();
     }
-    if (this.getStats() == null) {
-      this.stats = new LonerDistributionManager.DummyDMStats();
-    }
-    try {
-      this.conTable = ConnectionTable.create(this);
-    } catch (IOException io) {
-      throw new ConnectionException(
-          "Unable to initialize connection table",
-          io);
+    if (getStats() == null) {
+      stats = new LonerDistributionManager.DummyDMStats();
     }
+    conTable = ConnectionTable.create(this);
     startAcceptor();
   }
 
@@ -552,9 +520,6 @@ public class TCPConduit implements Runnable {
       if (Thread.currentThread().isInterrupted()) {
         break;
       }
-      if (stopper.isCancelInProgress()) {
-        break; // part of bug 37271
-      }
 
       Socket othersock = null;
       try {
@@ -566,6 +531,7 @@ public class TCPConduit implements Runnable {
               othersock.close();
             }
           } catch (Exception e) {
+            // ignored
           }
           continue;
         }
@@ -577,7 +543,7 @@ public class TCPConduit implements Runnable {
       } catch (ClosedChannelException | CancelException e) {
         break;
       } catch (IOException e) {
-        this.getStats().incFailedAccept();
+        getStats().incFailedAccept();
 
         try {
           if (othersock != null) {
@@ -589,7 +555,7 @@ public class TCPConduit implements Runnable {
 
         if (!stopped) {
           if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
-            // safe to ignore; see bug 31156
+            // safe to ignore
             if (!socket.isClosed()) {
               logger.warn("ServerSocket threw 'socket closed' exception but says it is not closed",
                   e);
@@ -597,8 +563,7 @@ public class TCPConduit implements Runnable {
                 socket.close();
                 createServerSocket();
               } catch (IOException ioe) {
-                logger.fatal("Unable to close and recreate server socket",
-                    ioe);
+                logger.fatal("Unable to close and recreate server socket", ioe);
                 // post 5.1.0x, this should force shutdown
                 try {
                   Thread.sleep(5000);
@@ -621,8 +586,8 @@ public class TCPConduit implements Runnable {
       }
 
       if (!stopped && socket.isClosed()) {
-        // NOTE: do not check for distributed system closing here. Messaging
-        // may need to occur during the closing of the DS or cache
+        // NOTE: do not check for distributed system closing here.
+        // Messaging may need to occur during the closing of the DS or cache
         logger.warn("ServerSocket closed - reopening");
         try {
           createServerSocket();
@@ -630,7 +595,7 @@ public class TCPConduit implements Runnable {
           logger.warn(ex.getMessage(), ex);
         }
       }
-    } // for
+    }
 
     if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
       logger.trace("Stopped P2P Listener on  {}", id);
@@ -638,29 +603,29 @@ public class TCPConduit implements Runnable {
   }
 
   private ConnectionTable getConTable() {
-    ConnectionTable result = this.conTable;
+    ConnectionTable result = conTable;
     if (result == null) {
       stopper.checkCancelInProgress(null);
-      throw new DistributedSystemDisconnectedException(
-          "tcp layer has been shutdown");
+      throw new DistributedSystemDisconnectedException("tcp layer has been shutdown");
     }
     return result;
   }
 
-  private void acceptConnection(Socket othersock) {
+  private void acceptConnection(Socket otherSocket) {
     try {
-      getConTable().acceptConnection(othersock, new PeerConnectionFactory());
+      getConTable().acceptConnection(otherSocket, new PeerConnectionFactory());
     } catch (IOException | ConnectionException io) {
       // exception is logged by the Connection
       if (!stopped) {
-        this.getStats().incFailedAccept();
+        getStats().incFailedAccept();
       }
     } catch (CancelException e) {
+      // ignored
     } catch (Exception e) {
       if (!stopped) {
-        this.getStats().incFailedAccept();
+        getStats().incFailedAccept();
         logger.warn("Failed to accept connection from {} because {}",
-            othersock.getInetAddress(), e);
+            otherSocket.getInetAddress(), e);
       }
     }
   }
@@ -690,17 +655,16 @@ public class TCPConduit implements Runnable {
    *
    * @param bytesRead number of bytes read off of network to get this message
    */
-  protected void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
+  void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
     if (logger.isTraceEnabled()) {
       logger.trace("{} received {} from {}", id, message, receiver);
     }
 
     if (directChannel != null) {
-      DistributionMessage msg = message;
-      msg.setBytesRead(bytesRead);
-      msg.setSender(receiver.getRemoteAddress());
-      msg.setSharedReceiver(receiver.isSharedResource());
-      directChannel.receive(msg, bytesRead);
+      message.setBytesRead(bytesRead);
+      message.setSender(receiver.getRemoteAddress());
+      message.setSharedReceiver(receiver.isSharedResource());
+      directChannel.receive(message, bytesRead);
     }
   }
 
@@ -722,7 +686,7 @@ public class TCPConduit implements Runnable {
    * Gets the local member ID that identifies this conduit
    */
   public InternalDistributedMember getMemberId() {
-    return this.localAddr;
+    return localAddr;
   }
 
   public void setMemberId(InternalDistributedMember addr) {
@@ -733,8 +697,6 @@ public class TCPConduit implements Runnable {
     return config;
   }
 
-
-
   /**
    * Return a connection to the given member. This method must continue to attempt to create a
    * connection to the given member as long as that member is in the membership view and the system
@@ -752,30 +714,26 @@ public class TCPConduit implements Runnable {
    */
   public Connection getConnection(InternalDistributedMember memberAddress,
       final boolean preserveOrder, boolean retry, long startTime, long ackTimeout,
-      long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException {
+      long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
     if (stopped) {
-      throw new DistributedSystemDisconnectedException(
-          "The conduit is stopped");
+      throw new DistributedSystemDisconnectedException("The conduit is stopped");
     }
 
-    Connection conn = null;
     InternalDistributedMember memberInTrouble = null;
-    boolean breakLoop = false;
-    for (;;) {
+    Connection conn = null;
+    for (boolean breakLoop = false;;) {
       stopper.checkCancelInProgress(null);
       boolean interrupted = Thread.interrupted();
       try {
-        // If this is the second time through this loop, we had
-        // problems. Tear down the connection so that it gets
-        // rebuilt.
+        // If this is the second time through this loop, we had problems.
+        // Tear down the connection so that it gets rebuilt.
         if (retry || conn != null) { // not first time in loop
           if (!membership.memberExists(memberAddress)
               || membership.isShunned(memberAddress)
               || membership.shutdownInProgress()) {
-            throw new IOException(
-                "TCP/IP connection lost and member is not in view");
+            throw new IOException("TCP/IP connection lost and member is not in view");
           }
-          // bug35953: Member is still in view; we MUST NOT give up!
+          // Member is still in view; we MUST NOT give up!
 
           // Pause just a tiny bit...
           try {
@@ -789,8 +747,7 @@ public class TCPConduit implements Runnable {
           if (!membership.memberExists(memberAddress)
               || membership.isShunned(memberAddress)) {
             // OK, the member left. Just register an error.
-            throw new IOException(
-                "TCP/IP connection lost and member is not in view");
+            throw new IOException("TCP/IP connection lost and member is not in view");
           }
 
           // Print a warning (once)
@@ -804,7 +761,7 @@ public class TCPConduit implements Runnable {
           }
 
           // Close the connection (it will get rebuilt later).
-          this.getStats().incReconnectAttempts();
+          getStats().incReconnectAttempts();
           if (conn != null) {
             try {
               if (logger.isDebugEnabled()) {
@@ -815,6 +772,7 @@ public class TCPConduit implements Runnable {
             } catch (CancelException ex) {
               throw ex;
             } catch (Exception ex) {
+              // ignored
             }
           }
         } // not first time in loop
@@ -822,8 +780,7 @@ public class TCPConduit implements Runnable {
         Exception problem = null;
         try {
           // Get (or regenerate) the connection
-          // bug36202: this could generate a ConnectionException, so it
-          // must be caught and retried
+          // this could generate a ConnectionException, so it must be caught and retried
           boolean retryForOldConnection;
           boolean debugRetry = false;
           do {
@@ -855,13 +812,13 @@ public class TCPConduit implements Runnable {
           // Race condition between acquiring the connection and attempting
           // to use it: another thread closed it.
           problem = e;
-          // [sumedh] No need to retry since Connection.createSender has already
+          // No need to retry since Connection.createSender has already
           // done retries and now member is really unreachable for some reason
           // even though it may be in the view
           breakLoop = true;
         } catch (IOException e) {
           problem = e;
-          // bug #43962 don't keep trying to connect to an alert listener
+          // don't keep trying to connect to an alert listener
           if (AlertingAction.isThreadAlerting()) {
             if (logger.isDebugEnabled()) {
               logger.debug("Giving up connecting to alert listener {}", memberAddress);
@@ -877,11 +834,10 @@ public class TCPConduit implements Runnable {
             // Bracket our original warning
             if (memberInTrouble != null) {
               // make this msg info to bracket warning
-              logger.info("Ending reconnect attempt because {} has disappeared.",
-                  memberInTrouble);
+              logger.info("Ending reconnect attempt because {} has disappeared.", memberInTrouble);
             }
-            throw new IOException(String.format("Peer has disappeared from view: %s",
-                memberAddress));
+            throw new IOException(
+                String.format("Peer has disappeared from view: %s", memberAddress));
           } // left the view
 
           if (membership.shutdownInProgress()) { // shutdown in progress
@@ -899,8 +855,7 @@ public class TCPConduit implements Runnable {
           // Log the warning. We wait until now, because we want
           // to have m defined for a nice message...
           if (memberInTrouble == null) {
-            logger.warn("Error sending message to {} (will reattempt): {}",
-                memberAddress, problem);
+            logger.warn("Error sending message to {} (will reattempt): {}", memberAddress, problem);
             memberInTrouble = memberAddress;
           } else {
             if (logger.isDebugEnabled()) {
@@ -910,21 +865,17 @@ public class TCPConduit implements Runnable {
 
           if (breakLoop) {
             if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
-              logger.warn("Throwing IOException after finding breakLoop=true",
-                  problem);
+              logger.warn("Throwing IOException after finding breakLoop=true", problem);
             }
             if (problem instanceof IOException) {
               throw (IOException) problem;
-            } else {
-              IOException ioe = new IOException(String.format("Problem connecting to %s",
-                  memberAddress));
-              ioe.initCause(problem);
-              throw ioe;
             }
+            throw new IOException(
+                String.format("Problem connecting to %s", memberAddress), problem);
           }
           // Retry the operation (indefinitely)
           continue;
-        } // problem != null
+        }
         // Success!
 
         // Make sure our logging is bracketed if there was a problem
@@ -940,12 +891,12 @@ public class TCPConduit implements Runnable {
           Thread.currentThread().interrupt();
         }
       }
-    } // for(;;)
+    }
   }
 
   @Override
   public String toString() {
-    return "" + id;
+    return String.valueOf(id);
   }
 
   /**
@@ -956,7 +907,7 @@ public class TCPConduit implements Runnable {
   }
 
   public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
-    ConnectionTable ct = this.conTable;
+    ConnectionTable ct = conTable;
     if (ct == null) {
       return;
     }
@@ -967,8 +918,8 @@ public class TCPConduit implements Runnable {
    * check to see if there are still any receiver threads for the given end-point
    */
   public boolean hasReceiversFor(DistributedMember endPoint) {
-    ConnectionTable ct = this.conTable;
-    return (ct != null) && ct.hasReceiversFor(endPoint);
+    ConnectionTable ct = conTable;
+    return ct != null && ct.hasReceiversFor(endPoint);
   }
 
   /**
@@ -983,10 +934,38 @@ public class TCPConduit implements Runnable {
   }
 
   public BufferPool getBufferPool() {
-    return this.conTable.getBufferPool();
+    return conTable.getBufferPool();
+  }
+
+  public CancelCriterion getCancelCriterion() {
+    return stopper;
+  }
+
+  /**
+   * if the conduit is disconnected due to an abnormal condition, this will describe the reason
+   *
+   * @return exception that caused disconnect
+   */
+  Exception getShutdownCause() {
+    return shutdownCause;
+  }
+
+  /**
+   * returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
+   */
+  protected SocketCreator getSocketCreator() {
+    return socketCreator;
   }
 
-  protected class Stopper extends CancelCriterion {
+  /**
+   * Called by Connection before handshake reply is sent. Returns true if member is part of
+   * view, false if membership is not confirmed before timeout.
+   */
+  boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
+    return membership.waitForNewMember(remoteId);
+  }
+
+  private class Stopper extends CancelCriterion {
 
     @Override
     public String cancelInProgress() {
@@ -994,7 +973,7 @@ public class TCPConduit implements Runnable {
       if (dm == null) {
         return "no distribution manager";
       }
-      if (TCPConduit.this.stopped) {
+      if (stopped) {
         return "Conduit has been stopped";
       }
       return null;
@@ -1015,40 +994,8 @@ public class TCPConduit implements Runnable {
         return result;
       }
       // We know we've been stopped; generate the exception
-      result = new DistributedSystemDisconnectedException("Conduit has been stopped");
-      result.initCause(e);
+      result = new DistributedSystemDisconnectedException("Conduit has been stopped", e);
       return result;
     }
   }
-
-  private final Stopper stopper = new Stopper();
-
-  public CancelCriterion getCancelCriterion() {
-    return stopper;
-  }
-
-
-  /**
-   * if the conduit is disconnected due to an abnormal condition, this will describe the reason
-   *
-   * @return exception that caused disconnect
-   */
-  public Exception getShutdownCause() {
-    return this.shutdownCause;
-  }
-
-  /**
-   * returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
-   */
-  protected SocketCreator getSocketCreator() {
-    return socketCreator;
-  }
-
-  /**
-   * ARB: Called by Connection before handshake reply is sent. Returns true if member is part of
-   * view, false if membership is not confirmed before timeout.
-   */
-  public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
-    return membership.waitForNewMember(remoteId);
-  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
deleted file mode 100755
index d303090..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.tcp;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.channels.SocketChannel;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.alerting.internal.spi.AlertingAction;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.Distribution;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.internal.net.SocketCreator;
-import org.apache.geode.test.junit.categories.MembershipTest;
-
-@Category({MembershipTest.class})
-public class ConnectionJUnitTest {
-
-  /**
-   * Test whether suspicion is raised about a member that closes its shared/unordered TCPConduit
-   * connection
-   */
-  @Test
-  public void testSuspicionRaised() throws Exception {
-    // this test has to create a lot of mocks because Connection
-    // uses a lot of objects
-
-    // mock the socket
-    ConnectionTable table = mock(ConnectionTable.class);
-    DistributionManager distMgr = mock(DistributionManager.class);
-    Distribution membership = mock(Distribution.class);
-    TCPConduit conduit = mock(TCPConduit.class);
-    DMStats stats = mock(DMStats.class);
-
-    // mock the connection table and conduit
-
-    when(table.getConduit()).thenReturn(conduit);
-    when(table.getBufferPool()).thenReturn(new BufferPool(stats));
-
-    CancelCriterion stopper = mock(CancelCriterion.class);
-    when(stopper.cancelInProgress()).thenReturn(null);
-    when(conduit.getCancelCriterion()).thenReturn(stopper);
-
-    when(conduit.getSocketId())
-        .thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337));
-
-    // mock the distribution manager and membership manager
-    when(distMgr.getDistribution()).thenReturn(membership);
-    when(conduit.getDM()).thenReturn(distMgr);
-    when(conduit.getStats()).thenReturn(stats);
-    when(table.getDM()).thenReturn(distMgr);
-    SocketCloser closer = mock(SocketCloser.class);
-    when(table.getSocketCloser()).thenReturn(closer);
-
-    SocketChannel channel = SocketChannel.open();
-
-    Connection conn = new Connection(table, channel.socket());
-    conn.setSharedUnorderedForTest();
-    conn.run();
-    verify(membership).suspectMember(isNull(InternalDistributedMember.class), any(String.class));
-  }
-
-  @Test
-  public void connectTimeoutIsShortWhenAlerting() throws UnknownHostException {
-    ConnectionTable table = mock(ConnectionTable.class);
-    TCPConduit conduit = mock(TCPConduit.class);
-    when(table.getConduit()).thenReturn(conduit);
-    when(conduit.getSocketId())
-        .thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 12345));
-    DistributionConfig config = mock(DistributionConfig.class);
-    when(config.getMemberTimeout()).thenReturn(100);
-    Connection connection = new Connection(table, mock(Socket.class));
-    int normalTimeout = connection.getP2PConnectTimeout(config);
-    assertThat(normalTimeout).isEqualTo(600);
-    AlertingAction.execute(() -> {
-      assertThat(connection.getP2PConnectTimeout(config)).isEqualTo(100);
-    });
-
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
index ff46493..06112f7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.tcp;
 
 import static org.junit.Assert.assertEquals;
@@ -34,9 +33,9 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
-
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
 public class ConnectionTableTest {
+
   private ConnectionTable connectionTable;
   private Socket socket;
   private PeerConnectionFactory factory;
@@ -71,8 +70,8 @@ public class ConnectionTableTest {
   @Test
   public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception {
     when(connection.isReceiverStopped()).thenReturn(false);
-    when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was
-                                                        // created
+    // Pretend this closed as soon at it was created
+    when(connection.isSocketClosed()).thenReturn(true);
 
     connectionTable.acceptConnection(socket, factory);
     assertEquals(0, connectionTable.getNumberOfReceivers());
@@ -80,9 +79,11 @@ public class ConnectionTableTest {
 
   @Test
   public void testThreadStoppedNotAddedAsReceivers() throws Exception {
-    when(connection.isSocketClosed()).thenReturn(false); // connection is not closed
+    // connection is not closed
+    when(connection.isSocketClosed()).thenReturn(false);
 
-    when(connection.isReceiverStopped()).thenReturn(true);// but receiver is stopped
+    // but receiver is stopped
+    when(connection.isReceiverStopped()).thenReturn(true);
 
     connectionTable.acceptConnection(socket, factory);
     assertEquals(0, connectionTable.getNumberOfReceivers());
@@ -90,19 +91,20 @@ public class ConnectionTableTest {
 
   @Test
   public void testSocketNotClosedAddedAsReceivers() throws Exception {
-    when(connection.isSocketClosed()).thenReturn(false);// connection is not closed
+    // connection is not closed
+    when(connection.isSocketClosed()).thenReturn(false);
 
     connectionTable.acceptConnection(socket, factory);
     assertEquals(1, connectionTable.getNumberOfReceivers());
   }
 
   @Test
-  public void testThreadOwnedSocketsAreRemoved() throws Exception {
+  public void testThreadOwnedSocketsAreRemoved() {
     Boolean wantsResources = ConnectionTable.getThreadOwnsResourcesRegistration();
     ConnectionTable.threadWantsOwnResources();
     try {
       Map<DistributedMember, Connection> threadConnectionMap = new HashMap<>();
-      connectionTable.threadOrderedConnMap.set(threadConnectionMap);
+      ConnectionTable.threadOrderedConnMap.set(threadConnectionMap);
       ConnectionTable.releaseThreadsSockets();
       assertEquals(0, threadConnectionMap.size());
     } finally {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 77160c8..a5dfec8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -14,24 +14,40 @@
  */
 package org.apache.geode.internal.tcp;
 
+import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
 public class ConnectionTest {
 
   @Test
-  public void shouldBeMockable() throws Exception {
+  public void canBeMocked() throws Exception {
     Connection mockConnection = mock(Connection.class);
     SocketChannel channel = null;
     ByteBuffer buffer = null;
@@ -43,4 +59,58 @@ public class ConnectionTest {
     verify(mockConnection, times(1)).writeFully(channel, buffer, forceAsync,
         mockDistributionMessage);
   }
+
+  /**
+   * Test whether suspicion is raised about a member that closes its shared/unordered TCPConduit
+   * connection
+   */
+  @Test
+  public void testSuspicionRaised() throws Exception {
+    ConnectionTable connectionTable = mock(ConnectionTable.class);
+    Distribution distribution = mock(Distribution.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    DMStats dmStats = mock(DMStats.class);
+    CancelCriterion stopper = mock(CancelCriterion.class);
+    SocketCloser socketCloser = mock(SocketCloser.class);
+    TCPConduit tcpConduit = mock(TCPConduit.class);
+
+    when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getDM()).thenReturn(distributionManager);
+    when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+    when(distributionManager.getDistribution()).thenReturn(distribution);
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+    when(tcpConduit.getDM()).thenReturn(distributionManager);
+    when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+
+    SocketChannel channel = SocketChannel.open();
+
+    Connection connection = new Connection(connectionTable, channel.socket());
+    connection.setSharedUnorderedForTest();
+    connection.run();
+
+    verify(distribution).suspectMember(isNull(), anyString());
+  }
+
+  @Test
+  public void connectTimeoutIsShortWhenAlerting() throws UnknownHostException {
+    ConnectionTable connectionTable = mock(ConnectionTable.class);
+    DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    TCPConduit tcpConduit = mock(TCPConduit.class);
+
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(distributionConfig.getMemberTimeout()).thenReturn(100);
+    when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));
+
+    Connection connection = new Connection(connectionTable, mock(Socket.class));
+
+    int normalTimeout = connection.getP2PConnectTimeout(distributionConfig);
+    assertThat(normalTimeout).isEqualTo(600);
+
+    AlertingAction.execute(() -> {
+      assertThat(connection.getP2PConnectTimeout(distributionConfig)).isEqualTo(100);
+    });
+  }
 }