You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/14 21:01:30 UTC

[geode] 01/02: GEODE-2113 Implement SSL over NIO

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 830e234929ec86bacdeab85cd66d079f8a1437f7
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Thu Jan 3 15:06:09 2019 -0800

    GEODE-2113 Implement SSL over NIO
    
    This removes old-I/O use in TCPConduit peer-to-peer communications.
    This was used for SSL/TLS secure commuications but Java has had an
    SSLEngine implementation that allows you to implement secure communications
    on new-I/O SocketChannels or any other transport mechanism.
    
    A new NioSSLEngine class wraps the JDK's SSLEngine and provides the
    SSL handshake as well as encryption/decryption of messages. SocketCreator
    performs the SSL handshake and returns a NioSslEngine that TCPConduit
    then uses for messaging.
    
    The SSL handshake needs to be done in Connection.java now because the
    ByteBuffer used to do the handshake is also used for reading messages
    in Receivers. Because of this the Handshake pool in TCPConduit became
    obsolete and I deleted it.
    
    I've also done a lot of cleanup of compilation warnings in Connection.java
    and removed references to "NIO". The primary SSL/TLS changes in that class
    are in writeFully (renamed from nioWriteFully) and processBuffer (renamed
    from processNIOBuffer).
    
    While testing I noticed some places where we're creating non-daemon
    threads that were keeping DUnit ChildVM processes from exiting.  I've
    changed these places to use daemon threads.  Very few threads in Geode
    should be non-daemon.
    
    Porting client/server to use NioSSLEngine will be done under a separate
    ticket and a different version of NioEngine may be created to secure
    UDP messaging.
    
    (cherry picked from commit a075b0e1a13a7a57378973bbfb7f14a63f29bf87)
---
 .../java/org/apache/geode/ClusterSSLDUnitTest.java |  249 +++
 .../CacheServerSSLConnectionDUnitTest.java         |  101 +-
 ...tServerHostNameVerificationDistributedTest.java |    5 +
 ...ToDataThrowsRuntimeExceptionRegressionTest.java |    3 -
 .../internal/cache/ConcurrentMapOpsDUnitTest.java  |    1 -
 .../internal/net/SSLSocketIntegrationTest.java     |  153 +-
 .../distributed/internal/DistributionStats.java    |    2 +-
 .../distributed/internal/direct/DirectChannel.java |   26 +-
 .../membership/gms/membership/GMSJoinLeave.java    |    2 +-
 .../membership/gms/mgr/GMSMembershipManager.java   |    3 -
 .../geode/internal/cache/EntryEventImpl.java       |   28 +-
 .../geode/internal/cache/PRQueryProcessor.java     |    2 +-
 .../monitoring/executor/AbstractExecutor.java      |    7 +-
 .../geode/internal/{tcp => net}/Buffers.java       |  101 +-
 .../{tcp/OioMsgReader.java => net/NioEngine.java}  |   37 +-
 .../org/apache/geode/internal/net/NioFilter.java   |   76 +
 .../apache/geode/internal/net/NioSslEngine.java    |  356 ++++
 .../apache/geode/internal/net/SocketCreator.java   |  106 +-
 .../apache/geode/internal/statistics/VMStats.java  |    2 +-
 .../statistics/platform/LinuxProcFsStatistics.java |  206 +-
 .../apache/geode/internal/stats50/VMStats50.java   |    2 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 2003 ++++++++------------
 .../apache/geode/internal/tcp/ConnectionTable.java |   14 +-
 .../apache/geode/internal/tcp/MsgDestreamer.java   |   15 -
 .../apache/geode/internal/tcp/MsgOutputStream.java |    3 +-
 .../org/apache/geode/internal/tcp/MsgReader.java   |   90 +-
 .../org/apache/geode/internal/tcp/MsgStreamer.java |    1 +
 .../apache/geode/internal/tcp/NIOMsgReader.java    |  109 --
 .../geode/internal/tcp/PeerConnectionFactory.java  |    1 +
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  231 +--
 .../apache/geode/internal/util/DscodeHelper.java   |    6 +-
 .../management/internal/FederatingManager.java     |    2 +-
 .../sanctioned-geode-core-serializables.txt        |    1 +
 .../org/apache/geode/internal/net/BuffersTest.java |   54 +
 .../geode/internal/net/NioSslEngineTest.java       |  495 +++++
 .../geode/internal/tcp/ConnectionJUnitTest.java    |   16 +-
 .../apache/geode/internal/tcp/ConnectionTest.java  |    4 +-
 .../util/PluckStacksJstackGeneratedDump.txt        |   18 +-
 .../resources/org/apache/geode/server.keystore     |  Bin 0 -> 1256 bytes
 39 files changed, 2666 insertions(+), 1865 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java
new file mode 100644
index 0000000..dbca857
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+import org.apache.geode.test.version.VersionManager;
+import org.apache.geode.util.test.TestUtil;
+
+@Category({MembershipTest.class, BackwardCompatibilityTest.class})
+public class ClusterSSLDUnitTest implements java.io.Serializable {
+
+  private static final int NUM_SERVERS = 2;
+  private static final int SMALL_BUFFER_SIZE = 8000;
+
+  private static final long serialVersionUID = -3438183140385150550L;
+
+  @Rule
+  public DistributedRule distributedRule =
+      DistributedRule.builder().withVMCount(NUM_SERVERS + 1).build();
+
+  @Rule
+  public final SerializableTestName testName = new SerializableTestName();
+
+  final String regionName = testName.getMethodName() + "_Region";
+
+  private static Cache cache;
+
+  @After
+  public void teardown() throws Exception {
+    if (cache != null) {
+      try {
+        if (!cache.isClosed()) {
+          cache.close();
+        }
+      } finally {
+        cache = null;
+      }
+    }
+  }
+
+
+  @Test
+  public void createEntryWithConserveSockets() throws Exception {
+    int locatorPort = createLocator(VM.getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, true);
+    }
+    performCreate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyCreatedEntry(VM.getVM(i));
+    }
+    performUpdate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyUpdatedEntry(VM.getVM(i));
+    }
+  }
+
+  @Test
+  public void createEntryWithThreadOwnedSockets() throws Exception {
+    int locatorPort = createLocator(VM.getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, false);
+    }
+    performCreate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyCreatedEntry(VM.getVM(i));
+    }
+    performUpdate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyUpdatedEntry(VM.getVM(i));
+    }
+  }
+
+  @Test
+  public void createEntryWithThreadOwnedSocketsAndBigMessage() throws Exception {
+    int locatorPort = createLocator(VM.getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, false);
+    }
+    performCreateWithLargeValue(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyCreatedEntry(VM.getVM(i));
+    }
+  }
+
+  @Test
+  public void performARollingUpgrade() throws Exception {
+    List<String> testVersions = VersionManager.getInstance().getVersionsWithoutCurrent();
+    Collections.sort(testVersions);
+    String testVersion = testVersions.get(testVersions.size() - 1);
+
+    // create a cluster with the previous version of Geode
+    VM locatorVM = Host.getHost(0).getVM(testVersion, 0);
+    VM server1VM = Host.getHost(0).getVM(testVersion, 1);
+    int locatorPort = createLocator(locatorVM);
+    createSSLEnabledCacheAndRegion(server1VM, locatorPort, true);
+    performCreate(VM.getVM(1));
+
+    // roll the locator to the current version
+    locatorVM.invoke("stop locator", () -> Locator.getLocator().stop());
+    locatorVM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 0);
+    locatorVM.invoke("roll locator to current version", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      Properties props = getDistributedSystemProperties();
+      // locator must restart with the same port so that it reconnects to the server
+      GeodeAwaitility.await().atMost(15, TimeUnit.SECONDS)
+          .until(() -> Locator.startLocatorAndDS(locatorPort, new File(""), props) != null);
+      assertThat(Locator.getLocator().getDistributedSystem().getAllOtherMembers().size())
+          .isGreaterThan(0);
+    });
+
+    // start server2 with current version
+    VM server2VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 2);
+    createSSLEnabledCacheAndRegion(server2VM, locatorPort, true);
+
+    // roll server1 to the current version
+    server1VM.invoke("stop server1", () -> {
+      cache.close();
+    });
+    server1VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 1);
+    createSSLEnabledCacheAndRegion(server1VM, locatorPort, true);
+
+
+    verifyCreatedEntry(server1VM);
+    verifyCreatedEntry(server2VM);
+  }
+
+  private void createSSLEnabledCacheAndRegion(VM memberVM, int locatorPort,
+      boolean conserveSockets) {
+    memberVM.invoke("start cache and create region", () -> {
+      cache = createCache(locatorPort, conserveSockets);
+      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    });
+  }
+
+
+  private void performCreate(VM memberVM) {
+    memberVM.invoke("perform create", () -> cache
+        .getRegion(regionName).put("testKey", "testValue"));
+  }
+
+  private void performUpdate(VM memberVM) {
+    memberVM.invoke("perform update", () -> cache
+        .getRegion(regionName).put("testKey", "updatedTestValue"));
+  }
+
+  private void performCreateWithLargeValue(VM memberVM) {
+    memberVM.invoke("perform create", () -> {
+      byte[] value = new byte[SMALL_BUFFER_SIZE];
+      Arrays.fill(value, (byte) 1);
+      cache.getRegion(regionName).put("testKey", value);
+    });
+  }
+
+  private void verifyCreatedEntry(VM memberVM) {
+    memberVM.invoke("verify entry created", () -> Assert.assertTrue(cache
+        .getRegion(regionName).containsKey("testKey")));
+  }
+
+  private void verifyUpdatedEntry(VM memberVM) {
+    memberVM.invoke("verify entry updated", () -> Assert.assertTrue(cache
+        .getRegion(regionName).containsValue("updatedTestValue")));
+  }
+
+  private int createLocator(VM memberVM) {
+    return memberVM.invoke("create locator", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties()).getPort();
+    });
+  }
+
+  private Cache createCache(int locatorPort, boolean conserveSockets) {
+    // if you need to debug SSL communications use this property:
+    // System.setProperty("javax.net.debug", "all");
+    Properties properties = getDistributedSystemProperties();
+    properties.put(ConfigurationProperties.LOCATORS, "localhost[" + locatorPort + "]");
+    properties.put(ConfigurationProperties.CONSERVE_SOCKETS, "" + conserveSockets);
+    return new CacheFactory(properties).create();
+  }
+
+  public Properties getDistributedSystemProperties() {
+    Properties properties = new Properties();
+    properties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
+    properties.put(USE_CLUSTER_CONFIGURATION, "false");
+    properties.put(SSL_ENABLED_COMPONENTS, "cluster");
+    properties.put(SSL_KEYSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
+    properties.put(SSL_TRUSTSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
+    properties.put(SSL_PROTOCOLS, "TLSv1.2");
+    properties.put(SSL_KEYSTORE_PASSWORD, "password");
+    properties.put(SSL_TRUSTSTORE_PASSWORD, "password");
+    properties.put(SSL_REQUIRE_AUTHENTICATION, "true");
+    properties.put(SOCKET_LEASE_TIME, "10000");
+    properties.put(NAME, "vm" + VM.getCurrentVMNum());
+    properties.put(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
+    return properties;
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
index 58ba260..986d04e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
@@ -23,7 +23,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_P
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_REQUIRE_AUTHENTICATION;
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_TRUSTSTORE;
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_TRUSTSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.SERVER_SSL_CIPHERS;
 import static org.apache.geode.distributed.ConfigurationProperties.SERVER_SSL_ENABLED;
@@ -49,6 +48,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -77,6 +77,7 @@ import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -138,13 +139,20 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
 
   @AfterClass
   public static void postClass() {
-    Invoke.invokeInEveryVM(() -> instance = null);
+    Invoke.invokeInEveryVM(() -> {
+      if (instance.cache != null) {
+        instance.cache.close();
+      }
+      instance = null;
+    });
+    if (instance.cache != null) {
+      instance.cache.close();
+    }
     instance = null;
   }
 
   public Cache createCache(Properties props) throws Exception {
     props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "");
     cache = new CacheFactory(props).create();
     if (cache == null) {
       throw new Exception("CacheFactory.create() returned null ");
@@ -170,30 +178,21 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
   }
 
   @SuppressWarnings("rawtypes")
-  public void setUpServerVM(final boolean cacheServerSslenabled) throws Exception {
+  public void setUpServerVM(final boolean cacheServerSslenabled, int optionalLocatorPort)
+      throws Exception {
     System.setProperty("javax.net.debug", "ssl,handshake");
 
     Properties gemFireProps = new Properties();
+    if (optionalLocatorPort > 0) {
+      gemFireProps.put("locators", "localhost[" + optionalLocatorPort + "]");
+    }
 
     String cacheServerSslprotocols = "any";
     String cacheServerSslciphers = "any";
     boolean cacheServerSslRequireAuth = true;
     if (!useOldSSLSettings) {
-      gemFireProps.put(SSL_ENABLED_COMPONENTS,
-          SecurableCommunicationChannel.CLUSTER + "," + SecurableCommunicationChannel.SERVER);
-      gemFireProps.put(SSL_PROTOCOLS, cacheServerSslprotocols);
-      gemFireProps.put(SSL_CIPHERS, cacheServerSslciphers);
-      gemFireProps.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(cacheServerSslRequireAuth));
-
-      String keyStore =
-          TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_KEY_STORE);
-      String trustStore =
-          TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_TRUST_STORE);
-      gemFireProps.put(SSL_KEYSTORE_TYPE, "jks");
-      gemFireProps.put(SSL_KEYSTORE, keyStore);
-      gemFireProps.put(SSL_KEYSTORE_PASSWORD, "password");
-      gemFireProps.put(SSL_TRUSTSTORE, trustStore);
-      gemFireProps.put(SSL_TRUSTSTORE_PASSWORD, "password");
+      getNewSSLSettings(gemFireProps, cacheServerSslprotocols, cacheServerSslciphers,
+          cacheServerSslRequireAuth);
     } else {
       gemFireProps.put(CLUSTER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
       gemFireProps.put(CLUSTER_SSL_PROTOCOLS, cacheServerSslprotocols);
@@ -222,6 +221,25 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     r.put("serverkey", "servervalue");
   }
 
+  private void getNewSSLSettings(Properties gemFireProps, String cacheServerSslprotocols,
+      String cacheServerSslciphers, boolean cacheServerSslRequireAuth) {
+    gemFireProps.put(SSL_ENABLED_COMPONENTS,
+        SecurableCommunicationChannel.CLUSTER + "," + SecurableCommunicationChannel.SERVER);
+    gemFireProps.put(SSL_PROTOCOLS, cacheServerSslprotocols);
+    gemFireProps.put(SSL_CIPHERS, cacheServerSslciphers);
+    gemFireProps.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(cacheServerSslRequireAuth));
+
+    String keyStore =
+        TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_KEY_STORE);
+    String trustStore =
+        TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_TRUST_STORE);
+    gemFireProps.put(SSL_KEYSTORE_TYPE, "jks");
+    gemFireProps.put(SSL_KEYSTORE, keyStore);
+    gemFireProps.put(SSL_KEYSTORE_PASSWORD, "password");
+    gemFireProps.put(SSL_TRUSTSTORE, trustStore);
+    gemFireProps.put(SSL_TRUSTSTORE_PASSWORD, "password");
+  }
+
   public void setUpClientVM(String host, int port, boolean cacheServerSslenabled,
       boolean cacheServerSslRequireAuth, String keyStore, String trustStore, boolean subscription,
       boolean clientHasTrustedKeystore) {
@@ -286,6 +304,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
 
     ClientCacheFactory clientCacheFactory = new ClientCacheFactory(gemFireProps);
     clientCacheFactory.setPoolSubscriptionEnabled(subscription).addPoolServer(host, port);
+    clientCacheFactory.setPoolRetryAttempts(5);
     clientCache = clientCacheFactory.create();
 
     ClientRegionFactory<String, String> regionFactory =
@@ -308,8 +327,9 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
   }
 
 
-  public static void setUpServerVMTask(boolean cacheServerSslenabled) throws Exception {
-    instance.setUpServerVM(cacheServerSslenabled);
+  public static void setUpServerVMTask(boolean cacheServerSslenabled, int optionalLocatorPort)
+      throws Exception {
+    instance.setUpServerVM(cacheServerSslenabled, optionalLocatorPort);
   }
 
   public static int createServerTask() throws Exception {
@@ -371,20 +391,35 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     final Host host = Host.getHost(0);
     VM serverVM = host.getVM(1);
     VM clientVM = host.getVM(2);
+    VM serverVM2 = host.getVM(3);
 
     boolean cacheServerSslenabled = true;
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
-    int port = serverVM.invoke(() -> createServerTask());
+    Properties locatorProps = new Properties();
+    String cacheServerSslprotocols = "any";
+    String cacheServerSslciphers = "any";
+    boolean cacheServerSslRequireAuth = true;
+    getNewSSLSettings(locatorProps, cacheServerSslprotocols, cacheServerSslciphers,
+        cacheServerSslRequireAuth);
+    Locator locator = Locator.startLocatorAndDS(0, new File(""), locatorProps);
+    int locatorPort = locator.getPort();
+    try {
+      serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, locatorPort));
+      int port = serverVM.invoke(() -> createServerTask());
+      serverVM2.invoke(() -> setUpServerVMTask(cacheServerSslenabled, locatorPort));
+      serverVM2.invoke(() -> createServerTask());
 
-    String hostName = host.getHostName();
+      String hostName = host.getHostName();
 
-    clientVM.invoke(() -> setUpClientVMTask(hostName, port, cacheClientSslenabled,
-        cacheClientSslRequireAuth, CLIENT_KEY_STORE, CLIENT_TRUST_STORE, true));
-    clientVM.invoke(() -> doClientRegionTestTask());
-    serverVM.invoke(() -> doServerRegionTestTask());
+      clientVM.invoke(() -> setUpClientVMTask(hostName, port, cacheClientSslenabled,
+          cacheClientSslRequireAuth, CLIENT_KEY_STORE, CLIENT_TRUST_STORE, true));
+      clientVM.invoke(() -> doClientRegionTestTask());
+      serverVM.invoke(() -> doServerRegionTestTask());
+    } finally {
+      locator.stop();
+    }
   }
 
   /**
@@ -413,7 +448,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     int port = serverVM.invoke(() -> createServerTask());
 
     String hostName = host.getHostName();
@@ -464,7 +499,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = false;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -511,7 +546,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     IgnoredException.addIgnoredException("SSLHandshakeException");
     IgnoredException.addIgnoredException("ValidatorException");
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -534,7 +569,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = false;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -567,7 +602,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
index eda8d9c..5565360 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
@@ -89,6 +89,7 @@ public class ClientServerHostNameVerificationDistributedTest {
 
   @Test
   public void expectConnectionFailureWhenNoHostNameInLocatorKey() throws Exception {
+
     CertificateBuilder locatorCertificate = new CertificateBuilder()
         .commonName("locator");
 
@@ -105,6 +106,7 @@ public class ClientServerHostNameVerificationDistributedTest {
 
   @Test
   public void expectConnectionFailureWhenWrongHostNameInLocatorKey() throws Exception {
+
     CertificateBuilder locatorCertificate = new CertificateBuilder()
         .commonName("locator")
         .sanDnsName("example.com");;
@@ -199,10 +201,13 @@ public class ClientServerHostNameVerificationDistributedTest {
       ClientRegionFactory<String, String> regionFactory =
           clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY);
 
+      IgnoredException.addIgnoredException("Connection reset");
+      IgnoredException.addIgnoredException("java.io.IOException");
       if (expectedExceptionOnClient != null) {
         IgnoredException.addIgnoredException("javax.net.ssl.SSLHandshakeException");
         IgnoredException.addIgnoredException("java.net.SocketException");
         IgnoredException.addIgnoredException("java.security.cert.CertificateException");
+        IgnoredException.addIgnoredException("java.net.ssl.SSLProtocolException");
 
         Region<String, String> clientRegion = regionFactory.create("region");
         assertThatExceptionOfType(expectedExceptionOnClient)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
index e2166b0..7e469b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
@@ -97,12 +97,10 @@ public class ValueToDataThrowsRuntimeExceptionRegressionTest extends JUnit4Cache
       Invoke.invokeInEveryVM(new SerializableCallable() {
         @Override
         public Object call() throws Exception {
-          System.getProperties().remove("p2p.oldIO");
           System.getProperties().remove("p2p.nodirectBuffers");
           return null;
         }
       });
-      System.getProperties().remove("p2p.oldIO");
       System.getProperties().remove("p2p.nodirectBuffers");
     }
   }
@@ -110,7 +108,6 @@ public class ValueToDataThrowsRuntimeExceptionRegressionTest extends JUnit4Cache
   @Override
   public Properties getDistributedSystemProperties() {
     Properties props = new Properties();
-    System.setProperty("p2p.oldIO", "true");
     props.setProperty(CONSERVE_SOCKETS, "true");
     // props.setProperty(DistributionConfig.ConfigurationProperties.MCAST_PORT, "12333");
     // props.setProperty(DistributionConfig.DISABLE_TCP_NAME, "true");
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ConcurrentMapOpsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ConcurrentMapOpsDUnitTest.java
index 777114c..43e2e4a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ConcurrentMapOpsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ConcurrentMapOpsDUnitTest.java
@@ -769,7 +769,6 @@ public class ConcurrentMapOpsDUnitTest extends JUnit4CacheTestCase {
         getCache().getLogger().fine("SWAP:doingRemove");
         assertTrue(r.remove("key0", "value"));
 
-        getCache().getLogger().fine("Bruce:doingExtraRemoves.  Bug #47010");
         DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND = false;
         assertTrue(r.remove("key0") == null);
         assertTrue(DestroyOp.TEST_HOOK_ENTRY_NOT_FOUND);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 59a8355..9ab1bb1 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -22,24 +22,34 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.internal.security.SecurableCommunicationChannel.CLUSTER;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.Properties;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
@@ -55,9 +65,12 @@ import org.junit.rules.ErrorCollector;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
+import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.ByteBufferOutputStream;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.internal.tcp.ByteBufferInputStream;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
 /**
@@ -104,6 +117,8 @@ public class SSLSocketIntegrationTest {
     System.setProperty("javax.net.ssl.trustStorePassword", "password");
     System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
     System.setProperty("javax.net.ssl.keyStorePassword", "password");
+    // System.setProperty("javax.net.debug", "ssl,handshake");
+
 
     Properties properties = new Properties();
     properties.setProperty(MCAST_PORT, "0");
@@ -174,6 +189,116 @@ public class SSLSocketIntegrationTest {
     assertThat(this.messageFromClient.get()).isEqualTo(MESSAGE);
   }
 
+  @Test
+  public void testSecuredSocketTransmissionShouldWorkUsingNIO() throws Exception {
+    ServerSocketChannel serverChannel = ServerSocketChannel.open();
+    serverSocket = serverChannel.socket();
+
+    InetSocketAddress addr = new InetSocketAddress(localHost, 0);
+    serverSocket.bind(addr, 10);
+    int serverPort = this.serverSocket.getLocalPort();
+
+    SocketCreator clusterSocketCreator =
+        SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
+    this.serverThread = startServerNIO(serverSocket, 15000);
+
+    await().until(() -> serverThread.isAlive());
+
+    SocketChannel clientChannel = SocketChannel.open();
+    clientChannel.connect(new InetSocketAddress(localHost, serverPort));
+    clientSocket = clientChannel.socket();
+    NioSslEngine engine =
+        clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
+            clusterSocketCreator.createSSLEngine("localhost", 1234), 0, true,
+            ByteBuffer.allocate(65535), mock(DMStats.class));
+    clientChannel.configureBlocking(true);
+
+    // transmit expected string from Client to Server
+    writeMessageToNIOSSLServer(clientChannel, engine);
+    writeMessageToNIOSSLServer(clientChannel, engine);
+    // this is the real assertion of this test
+    await().until(() -> {
+      return !serverThread.isAlive();
+    });
+    assertNull(serverException);
+    // assertThat(this.messageFromClient.get()).isEqualTo(MESSAGE);
+  }
+
+  private void writeMessageToNIOSSLServer(SocketChannel clientChannel, NioSslEngine engine)
+      throws IOException {
+    System.out.println("client sending Hello World message to server");
+    ByteBufferOutputStream bbos = new ByteBufferOutputStream(5000);
+    DataOutputStream dos = new DataOutputStream(bbos);
+    dos.writeUTF("Hello world");
+    dos.flush();
+    bbos.flush();
+    ByteBuffer buffer = bbos.getContentBuffer();
+    System.out.println(
+        "client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
+    ByteBuffer wrappedBuffer = engine.wrap(buffer);
+    System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+        + " and limit is " + wrappedBuffer.limit());
+    int bytesWritten = clientChannel.write(wrappedBuffer);
+    System.out.println("client bytes written is " + bytesWritten);
+  }
+
+  private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
+      throws Exception {
+    Thread serverThread = new Thread(new MyThreadGroup(this.testName.getMethodName()), () -> {
+      NioSslEngine engine = null;
+      Socket socket = null;
+      try {
+        ByteBuffer buffer = ByteBuffer.allocate(65535);
+
+        socket = serverSocket.accept();
+        SocketCreator sc = SocketCreatorFactory.getSocketCreatorForComponent(CLUSTER);
+        engine =
+            sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234),
+                timeoutMillis,
+                false,
+                ByteBuffer.allocate(500),
+                mock(DMStats.class));
+
+        readMessageFromNIOSSLClient(socket, buffer, engine);
+        readMessageFromNIOSSLClient(socket, buffer, engine);
+      } catch (Throwable throwable) {
+        throwable.printStackTrace(System.out);
+        serverException = throwable;
+      } finally {
+        if (engine != null && socket != null) {
+          final NioSslEngine nioSslEngine = engine;
+          engine.close(socket.getChannel());
+          assertThatThrownBy(() -> {
+            nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+          })
+              .isInstanceOf(IllegalStateException.class);
+        }
+      }
+    }, this.testName.getMethodName() + "-server");
+
+    serverThread.start();
+    return serverThread;
+  }
+
+  private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
+      throws IOException {
+    int bytesRead = socket.getChannel().read(buffer);
+    buffer.flip();
+    System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+        + buffer.position() + " and limit is " + buffer.limit());
+    ByteBuffer unwrapped = engine.unwrap(buffer);
+    unwrapped.flip();
+    System.out.println("server unwrapped buffer position is " + unwrapped.position()
+        + " and limit is " + unwrapped.limit());
+    ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
+    DataInputStream dis = new DataInputStream(bbis);
+    String welcome = dis.readUTF();
+    engine.doneReading(unwrapped);
+    assertThat(welcome).isEqualTo("Hello world");
+    System.out.println("server read Hello World message from client");
+  }
+
+
   @Test(expected = SocketTimeoutException.class)
   public void handshakeCanTimeoutOnServer() throws Throwable {
     this.serverSocket = this.socketCreator.createServerSocket(0, 0, this.localHost);
@@ -187,6 +312,33 @@ public class SSLSocketIntegrationTest {
     throw serverException;
   }
 
+  @Test(expected = SocketTimeoutException.class)
+  public void handshakeWithPeerCanTimeout() throws Throwable {
+    ServerSocketChannel serverChannel = ServerSocketChannel.open();
+    serverSocket = serverChannel.socket();
+
+    InetSocketAddress addr = new InetSocketAddress(localHost, 0);
+    serverSocket.bind(addr, 10);
+    int serverPort = this.serverSocket.getLocalPort();
+
+    this.serverThread = startServerNIO(this.serverSocket, 1000);
+
+    Socket socket = new Socket();
+    await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      try {
+        socket.connect(new InetSocketAddress(localHost, serverPort));
+      } catch (ConnectException e) {
+        return false;
+      } catch (SocketException e) {
+        return true; // server socket was closed
+      }
+      return true;
+    });
+    await().untilAsserted(() -> assertFalse(serverThread.isAlive()));
+    assertNotNull(serverException);
+    throw serverException;
+  }
+
   @Test
   public void configureClientSSLSocketCanTimeOut() throws Exception {
     final Semaphore serverCoordination = new Semaphore(0);
@@ -271,7 +423,6 @@ public class SSLSocketIntegrationTest {
 
   private Thread startServer(final ServerSocket serverSocket, int timeoutMillis) throws Exception {
     Thread serverThread = new Thread(new MyThreadGroup(this.testName.getMethodName()), () -> {
-      long startTime = System.currentTimeMillis();
       try {
         Socket socket = serverSocket.accept();
         SocketCreatorFactory.getSocketCreatorForComponent(CLUSTER).handshakeIfSocketIsSSL(socket,
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index ad109bd..9fa699f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -23,8 +23,8 @@ import org.apache.geode.StatisticsType;
 import org.apache.geode.StatisticsTypeFactory;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
-import org.apache.geode.internal.tcp.Buffers;
 import org.apache.geode.internal.util.Breadcrumbs;
 
 /**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index dbb4068..7d6d046 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -101,15 +101,6 @@ public class DirectChannel {
   }
 
   /**
-   * when the initial number of members is known, this method is invoked to ensure that connections
-   * to those members can be established in a reasonable amount of time. See bug 39848
-   *
-   */
-  public void setMembershipSize(int numberOfMembers) {
-    conduit.setMaximumHandshakePoolSize(numberOfMembers);
-  }
-
-  /**
    * Returns the cancel criterion for the channel, which will note if the channel is abnormally
    * closing
    */
@@ -480,20 +471,9 @@ public class DirectChannel {
       if (con.isSharedResource()) {
         continue;
       }
-      int msToWait = (int) (ackTimeout - (System.currentTimeMillis() - startTime));
-      // if the wait threshold has already been reached during transmission
-      // of the message, set a small wait period just to make sure the
-      // acks haven't already come back
-      if (msToWait <= 0) {
-        msToWait = 10;
-      }
-      long msInterval = ackSDTimeout;
-      if (msInterval <= 0) {
-        msInterval = Math.max(ackTimeout, 1000);
-      }
       try {
         try {
-          con.readAck(msToWait, msInterval, processor);
+          con.readAck(processor);
         } catch (SocketTimeoutException ex) {
           handleAckTimeout(ackTimeout, ackSDTimeout, con, processor);
         }
@@ -688,7 +668,7 @@ public class DirectChannel {
       // wait for ack-severe-alert-threshold period first, then wait forever
       if (ackSATimeout > 0) {
         try {
-          c.readAck((int) ackSATimeout, ackSATimeout, processor);
+          c.readAck(processor);
           return;
         } catch (SocketTimeoutException e) {
           Object[] args = new Object[] {Long.valueOf((ackSATimeout + ackTimeout) / 1000),
@@ -699,7 +679,7 @@ public class DirectChannel {
         }
       }
       try {
-        c.readAck(0, 0, processor);
+        c.readAck(processor);
       } catch (SocketTimeoutException ex) {
         // this can never happen when called with timeout of 0
         logger.error(String.format("Unexpected timeout while waiting for ack from %s",
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index c70351a..d5a4569 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -2693,7 +2693,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       logger.debug("checking availability of these members: {}", checkers);
       ExecutorService svc =
           LoggingExecutors.newFixedThreadPool("Geode View Creator verification thread ",
-              false, suspects.size());
+              true, suspects.size());
       try {
         long giveUpTime = System.currentTimeMillis() + viewAckTimeout;
         // submit the tasks that will remove dead members from the suspects collection
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index 5eb36fd..fc01cbb 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -2475,9 +2475,6 @@ public class GMSMembershipManager implements MembershipManager, Manager {
   @Override
   public void installView(NetView v) {
     if (latestViewId < 0 && !isConnected()) {
-      if (this.directChannel != null) {
-        this.directChannel.setMembershipSize(v.getMembers().size());
-      }
       latestViewId = v.getViewId();
       latestView = v;
       logger.debug("MembershipManager: initial view is {}", latestView);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index ff27adf..7a2f244 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -2101,21 +2101,23 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     buf.append(getRegion().getFullPath());
     buf.append(";key=");
     buf.append(this.getKey());
-    buf.append(";oldValue=");
-    try {
-      synchronized (this.offHeapLock) {
-        ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+    if (Boolean.getBoolean("gemfire.insecure-logvalues")) {
+      buf.append(";oldValue=");
+      try {
+        synchronized (this.offHeapLock) {
+          ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+        }
+      } catch (IllegalStateException ignore) {
+        buf.append("OFFHEAP_VALUE_FREED");
       }
-    } catch (IllegalStateException ignore) {
-      buf.append("OFFHEAP_VALUE_FREED");
-    }
-    buf.append(";newValue=");
-    try {
-      synchronized (this.offHeapLock) {
-        ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+      buf.append(";newValue=");
+      try {
+        synchronized (this.offHeapLock) {
+          ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+        }
+      } catch (IllegalStateException ignore) {
+        buf.append("OFFHEAP_VALUE_FREED");
       }
-    } catch (IllegalStateException ignore) {
-      buf.append("OFFHEAP_VALUE_FREED");
     }
     buf.append(";callbackArg=");
     buf.append(this.getRawCallbackArgument());
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
index 867d0cd..49183dc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
@@ -329,7 +329,7 @@ public class PRQueryProcessor {
     static synchronized void initializeExecutorService() {
       if (execService == null || execService.isShutdown() || execService.isTerminated()) {
         int numThreads = (TEST_NUM_THREADS > 1 ? TEST_NUM_THREADS : NUM_THREADS);
-        execService = LoggingExecutors.newFixedThreadPool("PRQueryProcessor", false, numThreads);
+        execService = LoggingExecutors.newFixedThreadPool("PRQueryProcessor", true, numThreads);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
index 6864e29..8f98926 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
@@ -60,14 +60,15 @@ public abstract class AbstractExecutor {
 
     StringBuilder strb = new StringBuilder();
 
-    strb.append("Thread <").append(this.threadID).append("> that was executed at <")
+    strb.append("Thread <").append(this.threadID).append("> (0x")
+        .append(Long.toHexString(this.threadID)).append(") that was executed at <")
         .append(dateFormat.format(this.getStartTime())).append("> has been stuck for <")
         .append((float) stuckTime / 1000)
         .append(" seconds> and number of thread monitor iteration <")
         .append(this.numIterationsStuck).append("> ").append(System.lineSeparator());
     if (logThreadDetails) {
       strb.append("Thread Name <").append(thread.getThreadName()).append(">")
-          .append(System.lineSeparator()).append("Thread state <").append(thread.getThreadState())
+          .append(" state <").append(thread.getThreadState())
           .append(">").append(System.lineSeparator());
 
       if (thread.getLockName() != null)
@@ -75,7 +76,7 @@ public abstract class AbstractExecutor {
             .append(System.lineSeparator());
 
       if (thread.getLockOwnerName() != null)
-        strb.append("Owned By <").append(thread.getLockOwnerName()).append("> and ID <")
+        strb.append("Owned By <").append(thread.getLockOwnerName()).append("> with ID <")
             .append(thread.getLockOwnerId()).append(">").append(System.lineSeparator());
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
similarity index 59%
rename from geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
index abb7fdb..c9e7bcb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
@@ -12,12 +12,11 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.tcp;
+package org.apache.geode.internal.net;
 
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
-import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.geode.distributed.internal.DMStats;
@@ -25,29 +24,45 @@ import org.apache.geode.internal.Assert;
 
 public class Buffers {
   /**
+   * Buffers may be acquired from the Buffers pool
+   * or they may be allocated using Buffer.allocate(). This enum is used
+   * to note the different types. Tracked buffers come from the Buffers pool
+   * and need to be released when we're done using them.
+   */
+  public enum BufferType {
+    UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER
+  }
+
+  /**
    * A list of soft references to byte buffers.
    */
-  private static final ConcurrentLinkedQueue bufferQueue = new ConcurrentLinkedQueue();
+  private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+      new ConcurrentLinkedQueue<>();
+
+  /**
+   * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+   */
+  public static boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers");
 
   /**
    * Should only be called by threads that have currently acquired send permission.
    *
    * @return a byte buffer to be used for sending on this connection.
    */
-  static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
+  public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
     return acquireBuffer(size, stats, true);
   }
 
-  static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
+  public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
     return acquireBuffer(size, stats, false);
   }
 
-  static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
+  private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
     ByteBuffer result;
-    if (TCPConduit.useDirectBuffers) {
+    if (useDirectBuffers) {
       IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
                                                                             // set
-      BBSoftReference ref = (BBSoftReference) bufferQueue.poll();
+      BBSoftReference ref = bufferQueue.poll();
       while (ref != null) {
         ByteBuffer bb = ref.getBB();
         if (bb == null) {
@@ -68,7 +83,7 @@ public class Buffers {
           // wasn't big enough so put it back in the queue
           Assert.assertTrue(bufferQueue.offer(ref));
           if (alreadySeen == null) {
-            alreadySeen = new IdentityHashMap<BBSoftReference, BBSoftReference>();
+            alreadySeen = new IdentityHashMap<>();
           }
           if (alreadySeen.put(ref, ref) != null) {
             // if it returns non-null then we have already seen this item
@@ -77,7 +92,7 @@ public class Buffers {
             break;
           }
         }
-        ref = (BBSoftReference) bufferQueue.poll();
+        ref = bufferQueue.poll();
       }
       result = ByteBuffer.allocateDirect(size);
     } else {
@@ -85,26 +100,67 @@ public class Buffers {
       result = ByteBuffer.allocate(size);
     }
     if (send) {
-      stats.incSenderBufferSize(size, TCPConduit.useDirectBuffers);
+      stats.incSenderBufferSize(size, useDirectBuffers);
     } else {
-      stats.incReceiverBufferSize(size, TCPConduit.useDirectBuffers);
+      stats.incReceiverBufferSize(size, useDirectBuffers);
     }
     return result;
   }
 
-  static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
+  public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
     releaseBuffer(bb, stats, true);
   }
 
-  static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
+  public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
     releaseBuffer(bb, stats, false);
   }
 
+  static ByteBuffer expandBuffer(Buffers.BufferType type, ByteBuffer existing,
+      int desiredCapacity, DMStats stats) {
+    if (existing.capacity() >= desiredCapacity) {
+      existing.compact();
+      return existing;
+    }
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    newBuffer.clear();
+    existing.flip();
+    newBuffer.put(existing);
+    releaseBuffer(type, existing, stats);
+    return newBuffer;
+  }
+
+  private static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
+    switch (type) {
+      case UNTRACKED:
+        return ByteBuffer.allocate(capacity);
+      case TRACKED_SENDER:
+        return Buffers.acquireSenderBuffer(capacity, stats);
+      case TRACKED_RECEIVER:
+        return Buffers.acquireReceiveBuffer(capacity, stats);
+    }
+    throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
+  }
+
+  static void releaseBuffer(Buffers.BufferType type, ByteBuffer buffer, DMStats stats) {
+    switch (type) {
+      case UNTRACKED:
+        return;
+      case TRACKED_SENDER:
+        Buffers.releaseSenderBuffer(buffer, stats);
+        return;
+      case TRACKED_RECEIVER:
+        Buffers.releaseReceiveBuffer(buffer, stats);
+        return;
+    }
+    throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
+  }
+
+
   /**
    * Releases a previously acquired buffer.
    */
-  static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
-    if (TCPConduit.useDirectBuffers) {
+  private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
+    if (useDirectBuffers) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
       bufferQueue.offer(bbRef);
     } else {
@@ -117,11 +173,8 @@ public class Buffers {
   }
 
   public static void initBufferStats(DMStats stats) { // fixes 46773
-    if (TCPConduit.useDirectBuffers) {
-      @SuppressWarnings("unchecked")
-      Iterator<BBSoftReference> it = (Iterator<BBSoftReference>) bufferQueue.iterator();
-      while (it.hasNext()) {
-        BBSoftReference ref = it.next();
+    if (useDirectBuffers) {
+      for (BBSoftReference ref : bufferQueue) {
         if (ref.getBB() != null) {
           if (ref.getSend()) { // fix bug 46773
             stats.incSenderBufferSize(ref.getSize(), true);
@@ -142,7 +195,7 @@ public class Buffers {
     private int size;
     private final boolean send;
 
-    public BBSoftReference(ByteBuffer bb, boolean send) {
+    BBSoftReference(ByteBuffer bb, boolean send) {
       super(bb);
       this.size = bb.capacity();
       this.send = send;
@@ -152,7 +205,7 @@ public class Buffers {
       return this.size;
     }
 
-    public synchronized int consumeSize() {
+    synchronized int consumeSize() {
       int result = this.size;
       this.size = 0;
       return result;
@@ -163,7 +216,7 @@ public class Buffers {
     }
 
     public ByteBuffer getBB() {
-      return (ByteBuffer) super.get();
+      return super.get();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/OioMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
similarity index 51%
rename from geode-core/src/main/java/org/apache/geode/internal/tcp/OioMsgReader.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
index 3ec4298..28cd3d6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/OioMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
@@ -12,28 +12,41 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.tcp;
+package org.apache.geode.internal.net;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.geode.internal.Version;
+import org.apache.geode.distributed.internal.DMStats;
 
 /**
- * A message reader which reads from the socket using the old io.
- *
+ * A pass-through implementation of NioFilter. Use this if you don't need
+ * secure communications.
  */
-public class OioMsgReader extends MsgReader {
+public class NioEngine implements NioFilter {
+
+  public NioEngine() {}
+
+  @Override
+  public ByteBuffer wrap(ByteBuffer buffer) {
+    return buffer;
+  }
 
-  public OioMsgReader(Connection conn, Version version) {
-    super(conn, version);
+  @Override
+  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+    wrappedBuffer.position(wrappedBuffer.limit());
+    return wrappedBuffer;
+  }
+
+  @Override
+  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
+    return wrappedBuffer;
   }
 
   @Override
-  public ByteBuffer readAtLeast(int bytes) throws IOException {
-    byte[] buffer = new byte[bytes];
-    conn.readFully(conn.getSocket().getInputStream(), buffer, bytes);
-    return ByteBuffer.wrap(buffer);
+  public ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType,
+      DMStats stats) {
+    return Buffers.expandBuffer(bufferType, wrappedBuffer, amount, stats);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
new file mode 100644
index 0000000..bb879e5
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geode.distributed.internal.DMStats;
+
+/**
+ * Prior to transmitting a buffer or processing a received buffer
+ * a NioFilter should be called to wrap (transmit) or unwrap (received)
+ * the buffer in case SSL is being used.
+ */
+public interface NioFilter {
+
+  /**
+   * wrap bytes for transmission to another process
+   */
+  ByteBuffer wrap(ByteBuffer buffer) throws IOException;
+
+  /**
+   * unwrap bytes received from another process. The unwrapped
+   * buffer should be flipped before reading. When done reading invoke
+   * doneReading() to reset for future read ops
+   */
+  ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
+
+  /**
+   * You must invoke this when done reading from the unwrapped buffer
+   */
+  default void doneReading(ByteBuffer unwrappedBuffer) {
+    if (unwrappedBuffer.position() != 0) {
+      unwrappedBuffer.compact();
+    } else {
+      unwrappedBuffer.position(unwrappedBuffer.limit());
+      unwrappedBuffer.limit(unwrappedBuffer.capacity());
+    }
+  }
+
+  /**
+   * invoke this method when you are done using the NioFilter
+   *
+   */
+  default void close(SocketChannel socketChannel) {
+    // nothing by default
+  }
+
+  /**
+   * returns the unwrapped byte buffer associated with the given wrapped buffer
+   */
+  ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
+
+  /**
+   * ensures that the unwrapped buffer associated with the given wrapped buffer has
+   * sufficient capacity for the given amount of bytes. This may compact the
+   * buffer or it may return a new buffer.
+   */
+  ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType,
+      DMStats stats);
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
new file mode 100644
index 0000000..32f4978
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
+import static javax.net.ssl.SSLEngineResult.Status.BUFFER_UNDERFLOW;
+import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static org.apache.geode.internal.net.Buffers.releaseBuffer;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.logging.LogService;
+
+
+/**
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
+ * safe. Its use should be confined to one thread or should be protected by external
+ * synchronization.
+ */
+public class NioSslEngine implements NioFilter {
+  private static final Logger logger = LogService.getLogger();
+
+  private final DMStats stats;
+
+  private boolean closed;
+
+  SSLEngine engine;
+
+  /**
+   * myNetData holds bytes wrapped by the SSLEngine
+   */
+  ByteBuffer myNetData;
+
+  /**
+   * peerAppData holds the last unwrapped data from a peer
+   */
+  ByteBuffer peerAppData;
+
+  /**
+   * buffer used to receive data during TLS handshake
+   */
+  ByteBuffer handshakeBuffer;
+
+  NioSslEngine(SSLEngine engine, DMStats stats) {
+    this.stats = stats;
+    SSLSession session = engine.getSession();
+    int appBufferSize = session.getApplicationBufferSize();
+    int packetBufferSize = engine.getSession().getPacketBufferSize();
+    this.myNetData = ByteBuffer.allocate(packetBufferSize);
+    this.peerAppData = ByteBuffer.allocate(appBufferSize);
+    this.engine = engine;
+  }
+
+  /**
+   * This will throw an SSLHandshakeException if the handshake doesn't terminate in a FINISHED
+   * state. It may throw other IOExceptions caused by I/O operations
+   */
+  public boolean handshake(SocketChannel socketChannel, int timeout,
+      ByteBuffer peerNetData)
+      throws IOException, InterruptedException {
+
+    if (peerNetData.capacity() < engine.getSession().getPacketBufferSize()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Allocating new buffer for SSL handshake");
+      }
+      this.handshakeBuffer =
+          Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(), stats);
+    } else {
+      this.handshakeBuffer = peerNetData;
+    }
+    this.handshakeBuffer.clear();
+
+    ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Starting TLS handshake with {}.  Timeout is {}ms", socketChannel.socket(),
+          timeout);
+    }
+
+    long timeoutNanos = -1;
+    if (timeout > 0) {
+      timeoutNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
+    }
+
+    // Begin handshake
+    engine.beginHandshake();
+    SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus();
+    SSLEngineResult engineResult = null;
+
+    // Process handshaking message
+    while (status != FINISHED &&
+        status != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+      if (socketChannel.socket().isClosed()) {
+        logger.info("Handshake terminated because socket is closed");
+        throw new SocketException("handshake terminated - socket is closed");
+      }
+
+      if (timeoutNanos > 0) {
+        if (timeoutNanos < System.nanoTime()) {
+          logger.info("TLS handshake is timing out");
+          throw new SocketTimeoutException("handshake timed out");
+        }
+      }
+
+      switch (status) {
+        case NEED_UNWRAP:
+          // Receive handshaking data from peer
+          int dataRead = socketChannel.read(handshakeBuffer);
+
+          // Process incoming handshaking data
+          handshakeBuffer.flip();
+          engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+          handshakeBuffer.compact();
+          status = engineResult.getHandshakeStatus();
+
+          // if we're not finished, there's nothing to process and no data was read let's hang out
+          // for a little
+          if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
+            Thread.sleep(10);
+          }
+
+          if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+            peerAppData =
+                expandBuffer(Buffers.BufferType.UNTRACKED, peerAppData, peerAppData.capacity() * 2,
+                    stats);
+          }
+          break;
+
+        case NEED_WRAP:
+          // Empty the local network packet buffer.
+          myNetData.clear();
+
+          // Generate handshaking data
+          engineResult = engine.wrap(myAppData, myNetData);
+          status = engineResult.getHandshakeStatus();
+
+          // Check status
+          switch (engineResult.getStatus()) {
+            case BUFFER_OVERFLOW:
+              myNetData =
+                  expandBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData,
+                      myNetData.capacity() * 2, stats);
+              break;
+            case OK:
+              myNetData.flip();
+              // Send the handshaking data to peer
+              while (myNetData.hasRemaining()) {
+                socketChannel.write(myNetData);
+              }
+              break;
+            case CLOSED:
+              break;
+            default:
+              logger.info("handshake terminated with illegal state(1) due to {}", status);
+              throw new IllegalStateException(
+                  "Unknown SSLEngineResult status: " + engineResult.getStatus());
+          }
+          break;
+        case NEED_TASK:
+          // Handle blocking tasks
+          handleBlockingTasks();
+          status = engine.getHandshakeStatus();
+          break;
+        default:
+          logger.info("handshake terminated with illegal state(2) due to {}", status);
+          throw new IllegalStateException("Unknown SSL Handshake state: " + status);
+      }
+      Thread.sleep(10);
+    }
+    if (status != FINISHED) {
+      logger.info("handshake terminated with exception due to {}", status);
+      throw new SSLHandshakeException("SSL Handshake terminated with status " + status);
+    }
+    if (logger.isDebugEnabled()) {
+      if (engineResult != null) {
+        logger.debug("TLS handshake successful.  result={} and handshakeResult={}",
+            engineResult.getStatus(), engine.getHandshakeStatus());
+      } else {
+        logger.debug("TLS handshake successful.  handshakeResult={}",
+            engine.getHandshakeStatus());
+      }
+    }
+    return true;
+  }
+
+  ByteBuffer expandBuffer(Buffers.BufferType type, ByteBuffer existing,
+      int desiredCapacity, DMStats stats) {
+    return Buffers.expandBuffer(type, existing, desiredCapacity, stats);
+  }
+
+  void checkClosed() {
+    if (closed) {
+      throw new IllegalStateException("NioSslEngine has been closed");
+    }
+  }
+
+  void handleBlockingTasks() {
+    Runnable task;
+    while ((task = engine.getDelegatedTask()) != null) {
+      // these tasks could be run in other threads but the SSLEngine will block until they finish
+      task.run();
+    }
+  }
+
+  @Override
+  public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException {
+    checkClosed();
+
+    myNetData.clear();
+
+    while (appData.hasRemaining()) {
+      // ensure we have lots of capacity since encrypted data might
+      // be larger than the app data
+      int remaining = myNetData.capacity() - myNetData.position();
+
+      if (remaining < (appData.remaining() * 2)) {
+        int newCapacity = expandedCapacity(appData, myNetData);
+        myNetData = expandBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData, newCapacity, stats);
+      }
+
+      SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+
+      if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+        handleBlockingTasks();
+      }
+
+      if (wrapResult.getStatus() != OK) {
+        throw new SSLException("Error encrypting data: " + wrapResult);
+      }
+    }
+
+    myNetData.flip();
+
+    return myNetData;
+  }
+
+  @Override
+  public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
+    checkClosed();
+
+    // note that we do not clear peerAppData as it may hold a partial
+    // message. TcpConduit, for instance, uses message chunking to
+    // transmit large payloads and we may have read a partial chunk
+    // during the previous unwrap
+    while (wrappedBuffer.hasRemaining()) {
+      int remaining = peerAppData.capacity() - peerAppData.position();
+      if (remaining < wrappedBuffer.remaining() * 2) {
+        int newCapacity = expandedCapacity(peerAppData, wrappedBuffer);
+        peerAppData = expandBuffer(Buffers.BufferType.UNTRACKED, peerAppData, newCapacity, stats);
+      }
+      SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+      if (unwrapResult.getStatus() == BUFFER_UNDERFLOW) {
+        // partial data - need to read more. When this happens the SSLEngine will not have
+        // changed the buffer position
+        wrappedBuffer.compact();
+        return peerAppData;
+      }
+
+      if (unwrapResult.getStatus() != OK) {
+        throw new SSLException("Error decrypting data: " + unwrapResult);
+      }
+    }
+    wrappedBuffer.clear();
+    return peerAppData;
+  }
+
+  @Override
+  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
+    return peerAppData;
+  }
+
+  @Override
+  public ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType,
+      DMStats stats) {
+    peerAppData = Buffers.expandBuffer(bufferType, peerAppData, amount, this.stats);
+    return peerAppData;
+  }
+
+
+  @Override
+  public void close(SocketChannel socketChannel) {
+    if (closed) {
+      return;
+    }
+    try {
+
+      if (!engine.isOutboundDone()) {
+        ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
+        engine.closeOutbound();
+
+        // clear the buffer to receive a CLOSE message from the SSLEngine
+        myNetData.clear();
+
+        // Get close message
+        SSLEngineResult result = engine.wrap(empty, myNetData);
+
+        if (result.getStatus() != SSLEngineResult.Status.CLOSED) {
+          throw new SSLHandshakeException(
+              "Error closing SSL session.  Status=" + result.getStatus());
+        }
+
+        // Send close message to peer
+        myNetData.flip();
+        while (myNetData.hasRemaining()) {
+          socketChannel.write(myNetData);
+        }
+      }
+    } catch (ClosedChannelException e) {
+      // we can't send a close message if the channel is closed
+    } catch (IOException e) {
+      throw new GemFireIOException("exception closing SSL session", e);
+    } finally {
+      releaseBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData, stats);
+      releaseBuffer(Buffers.BufferType.UNTRACKED, peerAppData, stats);
+      this.closed = true;
+    }
+  }
+
+  private int expandedCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) {
+    return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2,
+        targetBuffer.capacity() * 2);
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index f6dc962..3db82c7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -29,7 +29,9 @@ import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -63,6 +65,7 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -83,6 +86,7 @@ import org.apache.geode.admin.internal.InetAddressUtil;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.ClientSocketFactory;
+import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -163,11 +167,6 @@ public class SocketCreator {
   public static volatile boolean use_client_host_name = true;
 
   /**
-   * True if this SocketCreator has been initialized and is ready to use
-   */
-  private boolean ready = false;
-
-  /**
    * Only print this SocketCreator's config once
    */
   private boolean configShown = false;
@@ -226,6 +225,9 @@ public class SocketCreator {
             SocketCreator.useIPv6Addresses = true;
           }
         }
+        if (inetAddress == null) {
+          inetAddress = InetAddress.getLocalHost();
+        }
       }
     } catch (UnknownHostException e) {
     }
@@ -339,7 +341,6 @@ public class SocketCreator {
           .equals(sslConfig.getSecuredCommunicationChannel())) {
         if (this.sslConfig.isEnabled()) {
           System.setProperty("p2p.useSSL", "true");
-          System.setProperty("p2p.oldIO", "true");
           System.setProperty("p2p.nodirectBuffers", "true");
         } else {
           System.setProperty("p2p.useSSL", "false");
@@ -358,7 +359,7 @@ public class SocketCreator {
       org.apache.geode.internal.tcp.TCPConduit.init();
 
       initializeClientSocketFactory();
-      this.ready = true;
+
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);
       // If this ever returns, rethrow the error. We're poisoned
@@ -534,6 +535,7 @@ public class SocketCreator {
           System.getProperty("user.home") + System.getProperty("file.separator") + ".keystore";
     }
 
+
     FileInputStream fileInputStream = new FileInputStream(keyStoreFilePath);
     String passwordString = sslConfig.getKeystorePassword();
     char[] password = null;
@@ -637,6 +639,12 @@ public class SocketCreator {
     }
 
     @Override
+    public String chooseEngineClientAlias(String[] keyTypes, Principal[] principals,
+        SSLEngine sslEngine) {
+      return delegate.chooseEngineClientAlias(keyTypes, principals, sslEngine);
+    }
+
+    @Override
     public String chooseEngineServerAlias(final String keyType, final Principal[] principals,
         final SSLEngine sslEngine) {
       if (!StringUtils.isEmpty(this.keyAlias)) {
@@ -866,14 +874,6 @@ public class SocketCreator {
   }
 
   /**
-   * Return a client socket. This method is used by peers.
-   */
-  public Socket connectForServer(InetAddress inetadd, int port, int socketBufferSize)
-      throws IOException {
-    return connect(inetadd, port, 0, null, false, socketBufferSize);
-  }
-
-  /**
    * Return a client socket, timing out if unable to connect and timeout > 0 (millis). The parameter
    * <i>timeout</i> is ignored if SSL is being used, as there is no timeout argument in the ssl
    * socket factory
@@ -962,6 +962,76 @@ public class SocketCreator {
   }
 
   /**
+   * Returns an SSLEngine that can be used to perform TLS handshakes and communication
+   */
+  public SSLEngine createSSLEngine(String hostName, int port) {
+    return sslContext.createSSLEngine(hostName, port);
+  }
+
+  /**
+   * @see <a
+   *      href=https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLENG">JSSE
+   *      Reference Guide</a>
+   *
+   * @param socketChannel the socket's NIO channel
+   * @param engine the sslEngine (see createSSLEngine)
+   * @param timeout handshake timeout in milliseconds. No timeout if <= 0
+   * @param clientSocket set to true if you initiated the connect(), false if you accepted it
+   * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be
+   *        used in subsequent I/O operations
+   * @return The SSLEngine to be used in processing data for sending/receiving from the channel
+   */
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine,
+      int timeout,
+      boolean clientSocket,
+      ByteBuffer peerNetBuffer,
+      DMStats stats)
+      throws IOException {
+    engine.setUseClientMode(clientSocket);
+    while (!socketChannel.finishConnect()) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        if (!socketChannel.socket().isClosed()) {
+          socketChannel.close();
+        }
+        throw new IOException("Interrupted while performing handshake", e);
+      }
+    }
+
+    NioSslEngine nioSslEngine = new NioSslEngine(engine, stats);
+
+    boolean blocking = socketChannel.isBlocking();
+    if (blocking) {
+      socketChannel.configureBlocking(false);
+    }
+
+    try {
+      nioSslEngine.handshake(socketChannel, timeout, peerNetBuffer);
+    } catch (SSLException e) {
+      if (!socketChannel.socket().isClosed()) {
+        socketChannel.close();
+      }
+      logger.warn("SSL handshake exception", e);
+      throw e;
+    } catch (InterruptedException e) {
+      if (!socketChannel.socket().isClosed()) {
+        socketChannel.close();
+      }
+      throw new IOException("SSL handshake interrupted");
+    } finally {
+      if (blocking) {
+        try {
+          socketChannel.configureBlocking(true);
+        } catch (IOException ignored) {
+          // problem setting the socket back to blocking mode but the socket's going to be closed
+        }
+      }
+    }
+    return nioSslEngine;
+  }
+
+  /**
    * Use this method to perform the SSL handshake on a newly accepted socket. Non-SSL
    * sockets are ignored by this method.
    *
@@ -1079,13 +1149,13 @@ public class SocketCreator {
         }
       } catch (SSLHandshakeException ex) {
         logger
-            .fatal(String.format("SSL Error in connecting to peer %s[%s].",
-                new Object[] {socket.getInetAddress(), Integer.valueOf(socket.getPort())}),
+            .fatal(String.format("Problem forming SSL connection to %s[%s].",
+                socket.getInetAddress(), Integer.valueOf(socket.getPort())),
                 ex);
         throw ex;
       } catch (SSLPeerUnverifiedException ex) {
         if (this.sslConfig.isRequireAuth()) {
-          logger.fatal("SSL Error in authenticating peer.", ex);
+          logger.fatal("SSL authentication exception.", ex);
           throw ex;
         }
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/statistics/VMStats.java b/geode-core/src/main/java/org/apache/geode/internal/statistics/VMStats.java
index b54d692..27fc264 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/statistics/VMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/statistics/VMStats.java
@@ -36,7 +36,7 @@ public class VMStats implements VMStatsContract {
             f.createIntGauge("cpus", "Number of cpus available to the java VM on its machine.",
                 "cpus", true),
             f.createLongGauge("freeMemory",
-                "An approximation fo the total amount of memory currently available for future allocated objects, measured in bytes.",
+                "An approximation of the total amount of memory currently available for future allocated objects, measured in bytes.",
                 "bytes", true),
             f.createLongGauge("totalMemory",
                 "The total amount of memory currently available for current and future objects, measured in bytes.",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/statistics/platform/LinuxProcFsStatistics.java b/geode-core/src/main/java/org/apache/geode/internal/statistics/platform/LinuxProcFsStatistics.java
index 4dc1639..c6749d4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/statistics/platform/LinuxProcFsStatistics.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/statistics/platform/LinuxProcFsStatistics.java
@@ -55,7 +55,7 @@ public class LinuxProcFsStatistics {
   private static int sys_cpus;
   private static boolean hasProcVmStat;
   private static boolean hasDiskStats;
-  static SpaceTokenizer st;
+  static SpaceTokenizer tokenizer;
 
   /** The number of non-process files in /proc */
   private static int nonPidFilesInProc;
@@ -83,13 +83,12 @@ public class LinuxProcFsStatistics {
     cpuStatSingleton = new CpuStat();
     hasProcVmStat = new File("/proc/vmstat").exists();
     hasDiskStats = new File("/proc/diskstats").exists();
-    st = new SpaceTokenizer();
+    tokenizer = new SpaceTokenizer();
     return 0;
   }
 
   public static void close() { // TODO: was package-protected
     cpuStatSingleton = null;
-    st = null;
   }
 
   public static void readyRefresh() { // TODO: was package-protected
@@ -117,10 +116,11 @@ public class LinuxProcFsStatistics {
       if (line == null) {
         return;
       }
-      st.setString(line);
-      st.skipTokens(22);
-      ints[LinuxProcessStats.imageSizeINT] = (int) (st.nextTokenAsLong() / OneMeg);
-      ints[LinuxProcessStats.rssSizeINT] = (int) ((st.nextTokenAsLong() * pageSize) / OneMeg);
+      tokenizer.setString(line);
+      tokenizer.skipTokens(22);
+      ints[LinuxProcessStats.imageSizeINT] = (int) (tokenizer.nextTokenAsLong() / OneMeg);
+      ints[LinuxProcessStats.rssSizeINT] =
+          (int) ((tokenizer.nextTokenAsLong() * pageSize) / OneMeg);
     } catch (NoSuchElementException nsee) {
       // It might just be a case of the process going away while we
       // where trying to get its stats.
@@ -132,7 +132,7 @@ public class LinuxProcFsStatistics {
       // So for now lets just ignore the failure and leave the stats
       // as they are.
     } finally {
-      st.releaseResources();
+      tokenizer.releaseResources();
       if (br != null)
         try {
           br.close();
@@ -143,6 +143,10 @@ public class LinuxProcFsStatistics {
 
   public static void refreshSystem(int[] ints, long[] longs, double[] doubles) { // TODO: was
                                                                                  // package-protected
+    if (cpuStatSingleton == null) {
+      // stats have been closed or haven't been properly initialized
+      return;
+    }
     ints[LinuxSystemStats.processesINT] = getProcessCount();
     ints[LinuxSystemStats.cpusINT] = sys_cpus;
     InputStreamReader isr = null;
@@ -207,7 +211,7 @@ public class LinuxProcFsStatistics {
     if (hasProcVmStat) {
       getVmStats(longs);
     }
-    st.releaseResources();
+    tokenizer.releaseResources();
   }
 
   // Example of /proc/loadavg
@@ -222,14 +226,14 @@ public class LinuxProcFsStatistics {
       if (line == null) {
         return;
       }
-      st.setString(line);
-      doubles[LinuxSystemStats.loadAverage1DOUBLE] = st.nextTokenAsDouble();
-      doubles[LinuxSystemStats.loadAverage5DOUBLE] = st.nextTokenAsDouble();
-      doubles[LinuxSystemStats.loadAverage15DOUBLE] = st.nextTokenAsDouble();
+      tokenizer.setString(line);
+      doubles[LinuxSystemStats.loadAverage1DOUBLE] = tokenizer.nextTokenAsDouble();
+      doubles[LinuxSystemStats.loadAverage5DOUBLE] = tokenizer.nextTokenAsDouble();
+      doubles[LinuxSystemStats.loadAverage15DOUBLE] = tokenizer.nextTokenAsDouble();
     } catch (NoSuchElementException nsee) {
     } catch (IOException ioe) {
     } finally {
-      st.releaseResources();
+      tokenizer.releaseResources();
       if (br != null)
         try {
           br.close();
@@ -287,41 +291,41 @@ public class LinuxProcFsStatistics {
       while ((line = br.readLine()) != null) {
         try {
           if (line.startsWith("MemTotal: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.physicalMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.physicalMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("MemFree: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.freeMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.freeMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("SharedMem: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.sharedMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.sharedMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("Buffers: ")) {
-            st.setString(line);
-            st.nextToken(); // Burn initial token
-            ints[LinuxSystemStats.bufferMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.nextToken(); // Burn initial token
+            ints[LinuxSystemStats.bufferMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("SwapTotal: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.allocatedSwapINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.allocatedSwapINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("SwapFree: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.unallocatedSwapINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.unallocatedSwapINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("Cached: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.cachedMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.cachedMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("Dirty: ")) {
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.dirtyMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.dirtyMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           } else if (line.startsWith("Inact_dirty: ")) { // 2.4 kernels
-            st.setString(line);
-            st.skipToken(); // Burn initial token
-            ints[LinuxSystemStats.dirtyMemoryINT] = (int) (st.nextTokenAsLong() / 1024);
+            tokenizer.setString(line);
+            tokenizer.skipToken(); // Burn initial token
+            ints[LinuxSystemStats.dirtyMemoryINT] = (int) (tokenizer.nextTokenAsLong() / 1024);
           }
         } catch (NoSuchElementException nsee) {
           // ignore and let that stat not to be updated this time
@@ -329,7 +333,7 @@ public class LinuxProcFsStatistics {
       }
     } catch (IOException ioe) {
     } finally {
-      st.releaseResources();
+      tokenizer.releaseResources();
       if (br != null)
         try {
           br.close();
@@ -343,38 +347,38 @@ public class LinuxProcFsStatistics {
    * ListenOverflows=20 ListenDrops=21
    */
   private static void getNetStatStats(long[] longs, int[] ints) {
-    InputStreamReader isr;
-    BufferedReader br = null;
-    try {
-      isr = new InputStreamReader(new FileInputStream("/proc/net/netstat"));
-      br = new BufferedReader(isr);
+    try (InputStreamReader isr = new InputStreamReader(new FileInputStream("/proc/net/netstat"))) {
+      BufferedReader br = new BufferedReader(isr);
       String line;
       do {
         br.readLine(); // header
         line = br.readLine();
       } while (line != null && !line.startsWith("TcpExt:"));
 
-      st.setString(line);
-      st.skipTokens(1);
-      long tcpSyncookiesSent = st.nextTokenAsLong();
-      long tcpSyncookiesRecv = st.nextTokenAsLong();
-      st.skipTokens(17);
-      long tcpListenOverflows = st.nextTokenAsLong();
-      long tcpListenDrops = st.nextTokenAsLong();
+      tokenizer.setString(line);
+      tokenizer.skipTokens(1);
+      long tcpSyncookiesSent = tokenizer.nextTokenAsLong();
+      long tcpSyncookiesRecv = tokenizer.nextTokenAsLong();
+      tokenizer.skipTokens(17);
+      long tcpListenOverflows = tokenizer.nextTokenAsLong();
+      long tcpListenDrops = tokenizer.nextTokenAsLong();
 
       longs[LinuxSystemStats.tcpExtSynCookiesRecvLONG] = tcpSyncookiesRecv;
       longs[LinuxSystemStats.tcpExtSynCookiesSentLONG] = tcpSyncookiesSent;
       longs[LinuxSystemStats.tcpExtListenDropsLONG] = tcpListenDrops;
       longs[LinuxSystemStats.tcpExtListenOverflowsLONG] = tcpListenOverflows;
 
+      br.close();
+      br = null;
       if (!soMaxConnProcessed) {
-        br.close();
-        isr = new InputStreamReader(new FileInputStream("/proc/sys/net/core/somaxconn"));
-        br = new BufferedReader(isr);
-        line = br.readLine();
-        st.setString(line);
-        soMaxConn = st.nextTokenAsInt();
-        soMaxConnProcessed = true;
+        try (InputStreamReader soMaxConnIsr =
+            new InputStreamReader(new FileInputStream("/proc/sys/net/core/somaxconn"))) {
+          BufferedReader br2 = new BufferedReader(soMaxConnIsr);
+          line = br2.readLine();
+          tokenizer.setString(line);
+          soMaxConn = tokenizer.nextTokenAsInt();
+          soMaxConnProcessed = true;
+        }
       }
 
       ints[LinuxSystemStats.tcpSOMaxConnINT] = soMaxConn;
@@ -382,13 +386,7 @@ public class LinuxProcFsStatistics {
     } catch (NoSuchElementException nsee) {
     } catch (IOException ioe) {
     } finally {
-      st.releaseResources();
-      if (br != null) {
-        try {
-          br.close();
-        } catch (IOException ignore) {
-        }
-      }
+      tokenizer.releaseResources();
     }
   }
 
@@ -415,18 +413,18 @@ public class LinuxProcFsStatistics {
       while ((line = br.readLine()) != null) {
         int index = line.indexOf(":");
         boolean isloopback = (line.indexOf("lo:") != -1);
-        st.setString(line.substring(index + 1).trim());
-        long recv_bytes = st.nextTokenAsLong();
-        long recv_packets = st.nextTokenAsLong();
-        long recv_errs = st.nextTokenAsLong();
-        long recv_drop = st.nextTokenAsLong();
-        st.skipTokens(4); // fifo, frame, compressed, multicast
-        long xmit_bytes = st.nextTokenAsLong();
-        long xmit_packets = st.nextTokenAsLong();
-        long xmit_errs = st.nextTokenAsLong();
-        long xmit_drop = st.nextTokenAsLong();
-        st.skipToken(); // fifo
-        long xmit_colls = st.nextTokenAsLong();
+        tokenizer.setString(line.substring(index + 1).trim());
+        long recv_bytes = tokenizer.nextTokenAsLong();
+        long recv_packets = tokenizer.nextTokenAsLong();
+        long recv_errs = tokenizer.nextTokenAsLong();
+        long recv_drop = tokenizer.nextTokenAsLong();
+        tokenizer.skipTokens(4); // fifo, frame, compressed, multicast
+        long xmit_bytes = tokenizer.nextTokenAsLong();
+        long xmit_packets = tokenizer.nextTokenAsLong();
+        long xmit_errs = tokenizer.nextTokenAsLong();
+        long xmit_drop = tokenizer.nextTokenAsLong();
+        tokenizer.skipToken(); // fifo
+        long xmit_colls = tokenizer.nextTokenAsLong();
 
         if (isloopback) {
           lo_recv_packets = recv_packets;
@@ -463,7 +461,7 @@ public class LinuxProcFsStatistics {
     } catch (NoSuchElementException nsee) {
     } catch (IOException ioe) {
     } finally {
-      st.releaseResources();
+      tokenizer.releaseResources();
       if (br != null)
         try {
           br.close();
@@ -523,22 +521,22 @@ public class LinuxProcFsStatistics {
         br.readLine(); // Discard header info
       }
       while ((line = br.readLine()) != null) {
-        st.setString(line);
+        tokenizer.setString(line);
         {
           // " 8 1 sdb" on 2.6
           // " 8 1 452145145 sdb" on 2.4
-          String tok = st.nextToken();
+          String tok = tokenizer.nextToken();
           if (tok.length() == 0 || Character.isWhitespace(tok.charAt(0))) {
             // skip over first token since it is whitespace
-            tok = st.nextToken();
+            tok = tokenizer.nextToken();
           }
           // skip first token it is some number
-          tok = st.nextToken();
+          tok = tokenizer.nextToken();
           // skip second token it is some number
-          tok = st.nextToken();
+          tok = tokenizer.nextToken();
           if (!hasDiskStats) {
             // skip third token it is some number
-            tok = st.nextToken();
+            tok = tokenizer.nextToken();
           }
           // Now tok should be the device name.
           if (Character.isDigit(tok.charAt(tok.length() - 1))) {
@@ -547,20 +545,20 @@ public class LinuxProcFsStatistics {
             continue;
           }
         }
-        long tmp_readsCompleted = st.nextTokenAsLong();
-        long tmp_readsMerged = st.nextTokenAsLong();
-        long tmp_sectorsRead = st.nextTokenAsLong();
-        long tmp_timeReading = st.nextTokenAsLong();
-        if (st.hasMoreTokens()) {
+        long tmp_readsCompleted = tokenizer.nextTokenAsLong();
+        long tmp_readsMerged = tokenizer.nextTokenAsLong();
+        long tmp_sectorsRead = tokenizer.nextTokenAsLong();
+        long tmp_timeReading = tokenizer.nextTokenAsLong();
+        if (tokenizer.hasMoreTokens()) {
           // If we are on 2.6 then we might only have 4 longs; if so ignore this line
           // Otherwise we should have 11 long tokens.
-          long tmp_writesCompleted = st.nextTokenAsLong();
-          long tmp_writesMerged = st.nextTokenAsLong();
-          long tmp_sectorsWritten = st.nextTokenAsLong();
-          long tmp_timeWriting = st.nextTokenAsLong();
-          long tmp_iosInProgress = st.nextTokenAsLong();
-          long tmp_timeIosInProgress = st.nextTokenAsLong();
-          long tmp_ioTime = st.nextTokenAsLong();
+          long tmp_writesCompleted = tokenizer.nextTokenAsLong();
+          long tmp_writesMerged = tokenizer.nextTokenAsLong();
+          long tmp_sectorsWritten = tokenizer.nextTokenAsLong();
+          long tmp_timeWriting = tokenizer.nextTokenAsLong();
+          long tmp_iosInProgress = tokenizer.nextTokenAsLong();
+          long tmp_timeIosInProgress = tokenizer.nextTokenAsLong();
+          long tmp_ioTime = tokenizer.nextTokenAsLong();
           readsCompleted += tmp_readsCompleted;
           readsMerged += tmp_readsMerged;
           sectorsRead += tmp_sectorsRead;
@@ -591,7 +589,7 @@ public class LinuxProcFsStatistics {
       // NoSuchElementException line=" + line, nsee);
     } catch (IOException ioe) {
     } finally {
-      st.releaseResources();
+      tokenizer.releaseResources();
       if (br != null)
         try {
           br.close();
@@ -698,8 +696,8 @@ public class LinuxProcFsStatistics {
     }
 
     public int[] calculateStats(String newStatLine) {
-      st.setString(newStatLine);
-      st.skipToken(); // cpu name
+      tokenizer.setString(newStatLine);
+      tokenizer.skipToken(); // cpu name
       final int MAX_CPU_STATS = CPU.values().length;
       /*
        * newer kernels now have 10 columns for cpu in /proc/stat. This number may increase even
@@ -712,8 +710,8 @@ public class LinuxProcFsStatistics {
       int actualCpuStats = 0;
       long unaccountedCpuUtilization = 0;
 
-      while (st.hasMoreTokens()) {
-        newStats.add(st.nextTokenAsLong());
+      while (tokenizer.hasMoreTokens()) {
+        newStats.add(tokenizer.nextTokenAsLong());
         actualCpuStats++;
       }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/stats50/VMStats50.java b/geode-core/src/main/java/org/apache/geode/internal/stats50/VMStats50.java
index 79e6879..06c4999 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/stats50/VMStats50.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/stats50/VMStats50.java
@@ -215,7 +215,7 @@ public class VMStats50 implements VMStatsContract {
     sds.add(f.createLongCounter("unloadedClasses",
         "Total number of classes unloaded since vm started.", "classes", true));
     sds.add(f.createLongGauge("freeMemory",
-        "An approximation fo the total amount of memory currently available for future allocated objects, measured in bytes.",
+        "An approximation of the total amount of memory currently available for future allocated objects, measured in bytes.",
         "bytes", true));
     sds.add(f.createLongGauge("totalMemory",
         "The total amount of memory currently available for current and future objects, measured in bytes.",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f234ee7..28745b6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -16,16 +16,11 @@ package org.apache.geode.internal.tcp;
 
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
-import java.io.OutputStream;
 import java.net.ConnectException;
-import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
@@ -45,6 +40,9 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -69,7 +67,6 @@ import org.apache.geode.distributed.internal.direct.DirectChannel;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SystemTimer;
@@ -78,6 +75,9 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.NioEngine;
+import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.tcp.MsgReader.Header;
 import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
@@ -91,27 +91,25 @@ import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 public class Connection implements Runnable {
   private static final Logger logger = LogService.getLogger();
 
-  private static final int INITIAL_CAPACITY =
-      Integer.getInteger("p2p.readerBufferSize", 32768).intValue();
   private static int P2P_CONNECT_TIMEOUT;
   private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false;
 
-  public static final int NORMAL_MSG_TYPE = 0x4c;
-  public static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
-  public static final int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
-  public static final int DIRECT_ACK_BIT = 0x20;
+  static final int NORMAL_MSG_TYPE = 0x4c;
+  static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
+  static final int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
+  static final int DIRECT_ACK_BIT = 0x20;
 
-  public static final int MSG_HEADER_SIZE_OFFSET = 0;
-  public static final int MSG_HEADER_TYPE_OFFSET = 4;
-  public static final int MSG_HEADER_ID_OFFSET = 5;
-  public static final int MSG_HEADER_BYTES = 7;
+  static final int MSG_HEADER_SIZE_OFFSET = 0;
+  static final int MSG_HEADER_TYPE_OFFSET = 4;
+  static final int MSG_HEADER_ID_OFFSET = 5;
+  static final int MSG_HEADER_BYTES = 7;
 
   /**
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
   public static final int SMALL_BUFFER_SIZE =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096).intValue();
+      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /** counter to give connections a unique id */
   private static AtomicLong idCounter = new AtomicLong(1);
@@ -124,6 +122,7 @@ public class Connection implements Runnable {
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
+  private NioFilter ioFilter;
 
   /**
    * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
@@ -141,7 +140,12 @@ public class Connection implements Runnable {
   /** The idle timeout timer task for this connection */
   private SystemTimerTask idleTask;
 
-  private static final ThreadLocal isReaderThread = new ThreadLocal();
+  private static final ThreadLocal<Boolean> isReaderThread = new ThreadLocal<Boolean>() {
+    @Override
+    public Boolean initialValue() {
+      return Boolean.FALSE;
+    }
+  };
 
   public static void makeReaderThread() {
     // mark this thread as a reader thread
@@ -153,13 +157,8 @@ public class Connection implements Runnable {
   }
 
   // return true if this thread is a reader thread
-  public static boolean isReaderThread() {
-    Object o = isReaderThread.get();
-    if (o == null) {
-      return false;
-    } else {
-      return ((Boolean) o).booleanValue();
-    }
+  private static boolean isReaderThread() {
+    return isReaderThread.get();
   }
 
   private int getP2PConnectTimeout() {
@@ -182,10 +181,15 @@ public class Connection implements Runnable {
   private static final boolean DOMINO_THREAD_OWNED_SOCKETS =
       Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
 
-  private static final ThreadLocal isDominoThread = new ThreadLocal();
+  private static final ThreadLocal<Boolean> isDominoThread = new ThreadLocal<Boolean>() {
+    @Override
+    public Boolean initialValue() {
+      return Boolean.FALSE;
+    }
+  };
 
   // return true if this thread is a reader thread
-  public static boolean tipDomino() {
+  private static boolean tipDomino() {
     if (DOMINO_THREAD_OWNED_SOCKETS) {
       // mark this thread as one who wants to send ALL on TO sockets
       ConnectionTable.threadWantsOwnResources();
@@ -197,25 +201,17 @@ public class Connection implements Runnable {
   }
 
   public static boolean isDominoThread() {
-    Object o = isDominoThread.get();
-    if (o == null) {
-      return false;
-    } else {
-      return ((Boolean) o).booleanValue();
-    }
+    return isDominoThread.get();
   }
 
   /** the socket entrusted to this connection */
   private final Socket socket;
 
-  /** the non-NIO output stream */
-  OutputStream output;
-
   /** output stream/channel lock */
   private final Object outLock = new Object();
 
   /** the ID string of the conduit (for logging) */
-  String conduitIdStr;
+  private String conduitIdStr;
 
   /** Identifies the java group member on the other side of the connection. */
   InternalDistributedMember remoteAddr;
@@ -223,7 +219,7 @@ public class Connection implements Runnable {
   /**
    * Identifies the version of the member on the other side of the connection.
    */
-  Version remoteVersion;
+  private Version remoteVersion;
 
   /**
    * True if this connection was accepted by a listening socket. This makes it a receiver. False if
@@ -286,16 +282,16 @@ public class Connection implements Runnable {
   /**
    * Number of bytes in the outgoingQueue. Used to control capacity.
    */
-  private long queuedBytes = 0;
+  private long queuedBytes;
 
   /** used for async writes */
-  Thread pusherThread;
+  private Thread pusherThread;
 
   /**
    * The maximum number of concurrent senders sending a message to a single recipient.
    */
   private static final int MAX_SENDERS = Integer
-      .getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL).intValue();
+      .getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL);
   /**
    * This semaphore is used to throttle how many threads will try to do sends on this connection
    * concurrently. A thread must acquire this semaphore before it is allowed to start serializing
@@ -304,10 +300,10 @@ public class Connection implements Runnable {
   private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS);
 
   /** Set to true once the handshake has been read */
-  volatile boolean handshakeRead = false;
-  volatile boolean handshakeCancelled = false;
+  private volatile boolean handshakeRead;
+  private volatile boolean handshakeCancelled;
 
-  private volatile int replyCode = 0;
+  private volatile int replyCode;
 
   private static final byte REPLY_CODE_OK = (byte) 69;
   private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte) 70;
@@ -323,7 +319,7 @@ public class Connection implements Runnable {
   /** set to true once a close begins */
   private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  volatile boolean readerShuttingDown = false;
+  private volatile boolean readerShuttingDown = false;
 
   /** whether the socket is connected */
   volatile boolean connected = false;
@@ -331,10 +327,10 @@ public class Connection implements Runnable {
   /**
    * Set to true once a connection finishes its constructor
    */
-  volatile boolean finishedConnecting = false;
+  private volatile boolean finishedConnecting = false;
 
-  volatile boolean accessed = true;
-  volatile boolean socketInUse = false;
+  private volatile boolean accessed = true;
+  private volatile boolean socketInUse = false;
   volatile boolean timedOut = false;
 
   /**
@@ -346,7 +342,7 @@ public class Connection implements Runnable {
    * millisecond clock at the time message transmission started, if doing forced-disconnect
    * processing
    */
-  long transmissionStartTime;
+  private long transmissionStartTime;
 
   /** ack wait timeout - if socketInUse, use this to trigger SUSPECT processing */
   private long ackWaitTimeout;
@@ -358,38 +354,42 @@ public class Connection implements Runnable {
    * other connections participating in the current transmission. we notify them if ackSATimeout
    * expires to keep all members from generating alerts when only one is slow
    */
-  List ackConnectionGroup;
+  private List ackConnectionGroup;
 
   /** name of thread that we're currently performing an operation in (may be null) */
-  String ackThreadName;
+  private String ackThreadName;
 
-  /** the buffer used for NIO message receipt */
-  ByteBuffer nioInputBuffer;
+  /** the buffer used for message receipt */
+  private ByteBuffer inputBuffer;
 
   /** the length of the next message to be dispatched */
-  int nioMessageLength;
+  private int messageLength;
 
   /** the type of message being received */
-  byte nioMessageType;
+  private byte messageType;
+
+  /**
+   * when messages are chunked by a MsgStreamer we track the destreamers on
+   * the receiving side using a message identifier
+   */
+  private short messageId;
+
+  /** whether the length of the next message has been established */
+  private boolean lengthSet = false;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
 
   /** caches a msg destreamer that is currently not being used */
-  MsgDestreamer idleMsgDestreamer;
+  private MsgDestreamer idleMsgDestreamer;
 
   /**
-   * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using
-   * nio
+   * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages
    */
-  HashMap destreamerMap;
+  private HashMap destreamerMap;
 
-  boolean directAck;
+  private boolean directAck;
 
-  short nioMsgId;
-
-  /** whether the length of the next message has been established */
-  boolean nioLengthSet = false;
 
   /** is this connection used for serial message delivery? */
   boolean preserveOrder = false;
@@ -422,7 +422,7 @@ public class Connection implements Runnable {
     setSocketBufferSize(sock, false, requestedSize);
   }
 
-  public int getReceiveBufferSize() {
+  int getReceiveBufferSize() {
     return recvBufferSize;
   }
 
@@ -447,7 +447,6 @@ public class Connection implements Runnable {
           } else {
             sock.setReceiveBufferSize(requestedSize);
           }
-        } else {
         }
       } catch (SocketException ignore) {
       }
@@ -461,7 +460,7 @@ public class Connection implements Runnable {
         if (actualSize < requestedSize) {
           logger.info("Socket {} is {} instead of the requested {}.",
               (send ? "send buffer size" : "receive buffer size"),
-              Integer.valueOf(actualSize), Integer.valueOf(requestedSize));
+              actualSize, requestedSize);
         } else if (actualSize > requestedSize) {
           if (logger.isTraceEnabled()) {
             logger.trace("Socket {} buffer size is {} instead of the requested {}",
@@ -488,7 +487,7 @@ public class Connection implements Runnable {
   /**
    * Returns the size of the send buffer on this connection's socket.
    */
-  public int getSendBufferSize() {
+  int getSendBufferSize() {
     int result = this.sendBufferSize;
     if (result != -1) {
       return result;
@@ -504,32 +503,12 @@ public class Connection implements Runnable {
   }
 
   /**
-   * creates a connection that we accepted (it was initiated by an explicit connect being done on
-   * the other side). We will only receive data on this socket; never send.
-   */
-  protected static Connection createReceiver(ConnectionTable table, Socket socket)
-      throws IOException, ConnectionException {
-    Connection connection = new Connection(table, socket);
-    boolean readerStarted = false;
-    try {
-      connection.startReader(table);
-      readerStarted = true;
-    } finally {
-      if (!readerStarted) {
-        connection.closeForReconnect(
-            "could not start reader thread");
-      }
-    }
-    connection.waitForHandshake();
-    connection.finishedConnecting = true;
-    return connection;
-  }
-
-  /**
-   * creates a connection that we accepted (it was initiated by an explicit connect being done on
+   * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
+   * done on
    * the other side).
    */
-  protected Connection(ConnectionTable t, Socket socket) throws IOException, ConnectionException {
+  protected Connection(ConnectionTable t, Socket socket)
+      throws ConnectionException {
     if (t == null) {
       throw new IllegalArgumentException(
           "Null ConnectionTable");
@@ -552,22 +531,10 @@ public class Connection implements Runnable {
       // unable to get the settings we want. Don't log an error because it will
       // likely happen a lot
     }
-    if (!useNIO()) {
-      try {
-        // this.output = new BufferedOutputStream(socket.getOutputStream(), SMALL_BUFFER_SIZE);
-        this.output = socket.getOutputStream();
-      } catch (IOException io) {
-        logger.fatal("Unable to get P2P connection streams", io);
-        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
-        throw io;
-      }
-    }
   }
 
-  protected void initReceiver() {
+  void initReceiver() {
     this.startReader(owner);
-    this.waitForHandshake();
-    this.finishedConnecting = true;
   }
 
   void setIdleTimeoutTask(SystemTimerTask task) {
@@ -578,7 +545,7 @@ public class Connection implements Runnable {
   /**
    * Returns true if an idle connection was detected.
    */
-  public boolean checkForIdleTimeout() {
+  boolean checkForIdleTimeout() {
     if (isSocketClosed()) {
       return true;
     }
@@ -614,7 +581,6 @@ public class Connection implements Runnable {
     return isIdle;
   }
 
-  private static byte[] okHandshakeBytes;
   private static ByteBuffer okHandshakeBuf;
   static {
     int msglen = 1; // one byte for reply code
@@ -630,14 +596,13 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
     ByteBuffer bb;
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       bb = ByteBuffer.allocateDirect(allocSize);
     } else {
       bb = ByteBuffer.allocate(allocSize);
     }
     bb.put(bytes);
     okHandshakeBuf = bb;
-    okHandshakeBytes = bytes;
   }
 
   /**
@@ -645,38 +610,37 @@ public class Connection implements Runnable {
    */
   public static final int MAX_MSG_SIZE = 0x00ffffff;
 
-  public static int calcHdrSize(int byteSize) {
+  static int calcHdrSize(int byteSize) {
     if (byteSize > MAX_MSG_SIZE) {
       throw new IllegalStateException(String.format("tcp message exceeded max size of %s",
-          Integer.valueOf(MAX_MSG_SIZE)));
+          MAX_MSG_SIZE));
     }
     int hdrSize = byteSize;
     hdrSize |= (HANDSHAKE_VERSION << 24);
     return hdrSize;
   }
 
-  public static int calcMsgByteSize(int hdrSize) {
+  static int calcMsgByteSize(int hdrSize) {
     return hdrSize & MAX_MSG_SIZE;
   }
 
-  public static byte calcHdrVersion(int hdrSize) throws IOException {
+  static byte calcHdrVersion(int hdrSize) throws IOException {
     byte ver = (byte) (hdrSize >> 24);
     if (ver != HANDSHAKE_VERSION) {
       throw new IOException(
           String.format(
               "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-              new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
+              HANDSHAKE_VERSION, ver));
     }
     return ver;
   }
 
   private void sendOKHandshakeReply() throws IOException, ConnectionException {
-    byte[] my_okHandshakeBytes = null;
-    ByteBuffer my_okHandshakeBuf = null;
+    ByteBuffer my_okHandshakeBuf;
     if (this.isReceiver) {
       DistributionConfig cfg = owner.getConduit().config;
       ByteBuffer bb;
-      if (useNIO() && TCPConduit.useDirectBuffers) {
+      if (Buffers.useDirectBuffers) {
         bb = ByteBuffer.allocateDirect(128);
       } else {
         bb = ByteBuffer.allocate(128);
@@ -692,35 +656,17 @@ public class Connection implements Runnable {
       Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
       // now set the msg length into position 0
       bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES));
-      if (useNIO()) {
-        my_okHandshakeBuf = bb;
-        bb.flip();
-      } else {
-        my_okHandshakeBytes = new byte[bb.position()];
-        bb.flip();
-        bb.get(my_okHandshakeBytes);
-      }
+      my_okHandshakeBuf = bb;
+      bb.flip();
     } else {
       my_okHandshakeBuf = okHandshakeBuf;
-      my_okHandshakeBytes = okHandshakeBytes;
-    }
-    if (useNIO()) {
-      assert my_okHandshakeBuf != null;
-      synchronized (my_okHandshakeBuf) {
-        my_okHandshakeBuf.position(0);
-        nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
-      }
-    } else {
-      synchronized (outLock) {
-        assert my_okHandshakeBytes != null;
-        this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
-        this.output.flush();
-      }
     }
+    my_okHandshakeBuf.position(0);
+    writeFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
   }
 
   private static final int HANDSHAKE_TIMEOUT_MS =
-      Integer.getInteger("p2p.handshakeTimeoutMs", 59000).intValue();
+      Integer.getInteger("p2p.handshakeTimeoutMs", 59000);
   // private static final byte HANDSHAKE_VERSION = 1; // 501
   // public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
   // public static final byte HANDSHAKE_VERSION = 3; // durable_client
@@ -730,7 +676,7 @@ public class Connection implements Runnable {
   // NOTICE: handshake_version should not be changed anymore. Use the gemfire
   // version transmitted with the handshake bits and handle old handshakes
   // based on that
-  public static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
+  private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
 
   /**
    * @throws ConnectionException if the conduit has stopped
@@ -766,7 +712,7 @@ public class Connection implements Runnable {
                     String.format(
                         "Connection handshake with %s timed out after waiting %s milliseconds.",
 
-                        peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)));
+                        peerName, HANDSHAKE_TIMEOUT_MS));
               } else {
                 peerName = "socket " + this.socket.getRemoteSocketAddress().toString() + ":"
                     + this.socket.getPort();
@@ -774,7 +720,7 @@ public class Connection implements Runnable {
               throw new ConnectionException(
                   String.format(
                       "Connection handshake with %s timed out after waiting %s milliseconds.",
-                      peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)));
+                      peerName, HANDSHAKE_TIMEOUT_MS));
             } else {
               success = this.handshakeRead;
             }
@@ -828,24 +774,27 @@ public class Connection implements Runnable {
   /**
    * asynchronously close this connection
    *
-   * @param beingSick test hook to simulate sickness in communications & membership
+   * @param beingSickForTests test hook to simulate sickness in communications & membership
    */
-  private void asyncClose(boolean beingSick) {
+  private void asyncClose(boolean beingSickForTests) {
     // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
 
     // we do the close in a background thread because the operation may hang if
     // there is a problem with the network. See bug #46659
 
+    releaseInputBuffer();
+
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
-    if (beingSick) {
+    if (beingSickForTests) {
       prepareForAsyncClose();
     } else {
       if (this.asyncCloseCalled.compareAndSet(false, true)) {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr),
+              () -> ioFilter.close(s.getChannel()));
         }
       }
     }
@@ -881,7 +830,7 @@ public class Connection implements Runnable {
     }
   }
 
-  private void handshakeNio() throws IOException {
+  private void handshakeFromNewSender() throws IOException {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
@@ -914,42 +863,7 @@ public class Connection implements Runnable {
     // }
     connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, ClusterDistributionManager.STANDARD_EXECUTOR,
         MsgIdGenerator.NO_MSG_ID);
-    nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
-  }
-
-  private void handshakeStream() throws IOException {
-    waitForAddressCompletion();
-
-    this.output = getSocket().getOutputStream();
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
-    DataOutputStream os = new DataOutputStream(baos);
-    InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    os.writeByte(0);
-    os.writeByte(HANDSHAKE_VERSION);
-    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-    InternalDataSerializer.invokeToData(myAddr, os);
-    os.writeBoolean(this.sharedResource);
-    os.writeBoolean(this.preserveOrder);
-    os.writeLong(this.uniqueId);
-    Version.CURRENT.writeOrdinal(os, true);
-    os.writeInt(dominoCount.get() + 1);
-    os.flush();
-
-    byte[] msg = baos.toByteArray();
-    int len = calcHdrSize(msg.length);
-    byte[] lenbytes = new byte[MSG_HEADER_BYTES];
-    lenbytes[MSG_HEADER_SIZE_OFFSET] = (byte) ((len / 0x1000000) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) ((len / 0x10000) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
-    lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
-    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
-    lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
-    synchronized (outLock) {
-      this.output.write(lenbytes, 0, lenbytes.length);
-      this.output.write(msg, 0, msg.length);
-      this.output.flush();
-    }
+    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
   /**
@@ -958,20 +872,18 @@ public class Connection implements Runnable {
    */
   private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
-    // send this server's port. It's expected on the other side
-    if (useNIO()) {
-      handshakeNio();
-    } else {
-      handshakeStream();
+    // send this member's information. It's expected on the other side
+    if (logger.isDebugEnabled()) {
+      logger.debug("starting peer-to-peer handshake on socket {}", socket);
     }
-
+    handshakeFromNewSender();
     startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
   /** time between connection attempts */
   private static final int RECONNECT_WAIT_TIME = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000).intValue();
+      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
 
   /**
    * creates a new connection to a remote server. We are initiating this connection; the other side
@@ -1079,7 +991,8 @@ public class Connection implements Runnable {
             connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of
                                           // these
             logger.info("Connection: shared={} ordered={} failed to connect to peer {} because: {}",
-                sharedResource, preserveOrder, remoteAddr, ioe);
+                sharedResource, preserveOrder, remoteAddr,
+                ioe.getCause() != null ? ioe.getCause() : ioe);
           }
         } // IOException
         finally {
@@ -1106,10 +1019,7 @@ public class Connection implements Runnable {
             }
           } catch (ConnectionException e) {
             if (giveUpOnMember(mgr, remoteAddr)) {
-              IOException ioe =
-                  new IOException("Handshake failed");
-              ioe.initCause(e);
-              throw ioe;
+              throw new IOException("Handshake failed", e);
             }
             t.getConduit().getCancelCriterion().checkCancelInProgress(null);
             logger.info(
@@ -1208,80 +1118,69 @@ public class Connection implements Runnable {
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
-    if (useNIO()) {
-      SocketChannel channel = SocketChannel.open();
-      this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
-      try {
-        channel.socket().setTcpNoDelay(true);
+    SocketChannel channel = SocketChannel.open();
+    this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
 
-        channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+    try {
+      channel.socket().setTcpNoDelay(true);
+      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+
+      /*
+       * If conserve-sockets is false, the socket can be used for receiving responses, so set the
+       * receive buffer accordingly.
+       */
+      if (!sharedResource) {
+        setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+      } else {
+        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        // receive ack messages
+      }
+      setSendBufferSize(channel.socket());
+      channel.configureBlocking(true);
 
-        /*
-         * If conserve-sockets is false, the socket can be used for receiving responses, so set the
-         * receive buffer accordingly.
-         */
-        if (!sharedResource) {
-          setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
-        } else {
-          setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
-                                                                     // receive ack messages
-        }
-        setSendBufferSize(channel.socket());
-        channel.configureBlocking(true);
+      int connectTime = getP2PConnectTimeout();
+
+      try {
 
-        int connectTime = getP2PConnectTimeout();
+        channel.socket().connect(addr, connectTime);
 
+        createIoFilter(channel, true);
+
+      } catch (NullPointerException e) {
+        // bug #45044 - jdk 1.7 sometimes throws an NPE here
+        ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
+        c.initCause(e);
+        // prevent a hot loop by sleeping a little bit
         try {
-          channel.socket().connect(addr, connectTime);
-        } catch (NullPointerException e) {
-          // bug #45044 - jdk 1.7 sometimes throws an NPE here
-          ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
-          c.initCause(e);
-          // prevent a hot loop by sleeping a little bit
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-          throw c;
-        } catch (CancelledKeyException | ClosedSelectorException e) {
-          // bug #44469: for some reason NIO throws this runtime exception
-          // instead of an IOException on timeouts
-          ConnectException c = new ConnectException(
-              String.format("Attempt timed out after %s milliseconds",
-                  new Object[] {connectTime}));
-          c.initCause(e);
-          throw c;
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
-      } finally {
-        this.owner.removeConnectingSocket(channel.socket());
-      }
-      this.socket = channel.socket();
-    } else {
-      if (TCPConduit.useSSL) {
-        int socketBufferSize =
-            sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
-        this.socket = owner.getConduit().getSocketCreator().connectForServer(
-            remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize);
-        // Set the receive buffer size local fields. It has already been set in the socket.
-        setSocketBufferSize(this.socket, false, socketBufferSize, true);
-        setSendBufferSize(this.socket);
-      } else {
-        Socket s = new Socket();
-        this.socket = s;
-        s.setTcpNoDelay(true);
-        s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
-        setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
-        setSendBufferSize(s);
-        s.connect(addr, 0);
+        throw c;
+      } catch (SSLException e) {
+        ConnectException c = new ConnectException("Problem connecting to peer " + addr);
+        c.initCause(e);
+        throw c;
+      } catch (CancelledKeyException | ClosedSelectorException e) {
+        // bug #44469: for some reason NIO throws this runtime exception
+        // instead of an IOException on timeouts
+        ConnectException c = new ConnectException(
+            String.format("Attempt timed out after %s milliseconds",
+                connectTime));
+        c.initCause(e);
+        throw c;
       }
+    } finally {
+      this.owner.removeConnectingSocket(channel.socket());
     }
+    this.socket = channel.socket();
+
     if (logger.isDebugEnabled()) {
       logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
     }
     try {
       getSocket().setTcpNoDelay(true);
-    } catch (SocketException e) {
+    } catch (SocketException ignored) {
     }
   }
 
@@ -1293,20 +1192,15 @@ public class Connection implements Runnable {
    */
   private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
   private static final int BATCH_BUFFER_SIZE =
-      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
-  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
-  private Object batchLock;
+      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
+  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
+  private final Object batchLock = new Object();
   private ByteBuffer fillBatchBuffer;
   private ByteBuffer sendBatchBuffer;
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
-    // batch send buffer isn't needed if old-io is being used
-    if (!this.useNIO) {
-      return;
-    }
-    this.batchLock = new Object();
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
       this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
     } else {
@@ -1317,7 +1211,7 @@ public class Connection implements Runnable {
     this.batchFlusher.start();
   }
 
-  public void cleanUpOnIdleTaskCancel() {
+  void cleanUpOnIdleTaskCancel() {
     // Make sure receivers are removed from the connection table, this should always be a noop, but
     // is done here as a failsafe.
     if (isReceiver) {
@@ -1325,13 +1219,17 @@ public class Connection implements Runnable {
     }
   }
 
+  public void setInputBuffer(ByteBuffer buffer) {
+    this.inputBuffer = buffer;
+  }
+
   private class BatchBufferFlusher extends Thread {
     private volatile boolean flushNeeded = false;
     private volatile boolean timeToStop = false;
     private DMStats stats;
 
 
-    public BatchBufferFlusher() {
+    BatchBufferFlusher() {
       setDaemon(true);
       this.stats = owner.getConduit().getStats();
     }
@@ -1339,7 +1237,7 @@ public class Connection implements Runnable {
     /**
      * Called when a message writer needs the current fillBatchBuffer flushed
      */
-    public void flushBuffer(ByteBuffer bb) {
+    void flushBuffer(ByteBuffer bb) {
       final long start = DistributionStats.getStatTime();
       try {
         synchronized (this) {
@@ -1408,7 +1306,7 @@ public class Connection implements Runnable {
                 try {
                   sendBatchBuffer.flip();
                   SocketChannel channel = getSocket().getChannel();
-                  nioWriteFully(channel, sendBatchBuffer, false, null);
+                  writeFully(channel, sendBatchBuffer, false, null);
                   sendBatchBuffer.clear();
                 } catch (IOException | ConnectionException ex) {
                   logger.fatal("Exception flushing batch send buffer: %s", ex);
@@ -1548,15 +1446,12 @@ public class Connection implements Runnable {
               }
             }
           }
-          if (logger.isDebugEnabled()) {
-            logger.debug("Closing socket for {}", this);
-          }
         } else if (!forceRemoval) {
           removeEndpoint = false;
         }
         // make sure our socket is closed
         asyncClose(false);
-        nioLengthSet = false;
+        lengthSet = false;
       } // synchronized
 
       // moved the call to notifyHandshakeWaiter out of the above
@@ -1655,6 +1550,9 @@ public class Connection implements Runnable {
 
   /** starts a reader thread */
   private void startReader(ConnectionTable connTable) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Starting thread for " + p2pReaderName());
+    }
     Assert.assertTrue(!this.isRunning);
     stopped = false;
     this.isRunning = true;
@@ -1672,29 +1570,23 @@ public class Connection implements Runnable {
     ConnectionTable.threadWantsSharedResources();
     makeReaderThread(this.isReceiver);
     try {
-      if (useNIO()) {
-        runNioReader();
-      } else {
-        runOioReader();
-      }
+      readMessages();
     } finally {
       // bug36060: do the socket close within a finally block
       if (logger.isDebugEnabled()) {
         logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
       }
-      initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
+        initiateSuspicionIfSharedUnordered();
         if (!this.sharedResource) {
           this.conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
-      }
-      ByteBuffer tmp = this.nioInputBuffer;
-      if (tmp != null) {
-        this.nioInputBuffer = null;
-        final DMStats stats = this.owner.getConduit().getStats();
-        Buffers.releaseReceiveBuffer(tmp, stats);
+
+        if (this.isSharedResource()) {
+          releaseInputBuffer();
+        }
       }
       // make sure that if the reader thread exits we notify a thread waiting
       // for the handshake.
@@ -1708,6 +1600,15 @@ public class Connection implements Runnable {
     } // finally
   }
 
+  private void releaseInputBuffer() {
+    ByteBuffer tmp = this.inputBuffer;
+    if (tmp != null) {
+      this.inputBuffer = null;
+      final DMStats stats = this.owner.getConduit().getStats();
+      Buffers.releaseReceiveBuffer(tmp, stats);
+    }
+  }
+
   private String p2pReaderName() {
     StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
@@ -1722,18 +1623,23 @@ public class Connection implements Runnable {
     return sb.toString();
   }
 
-  private void runNioReader() {
+  private void readMessages() {
     // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
-    SocketChannel channel = null;
+    SocketChannel channel;
     try {
       channel = getSocket().getChannel();
+      socket.setSoTimeout(0);
+      socket.setTcpNoDelay(true);
+      if (ioFilter == null) {
+        createIoFilter(channel, false);
+      }
       channel.configureBlocking(true);
     } catch (ClosedChannelException e) {
       // bug 37693: the channel was asynchronously closed. Our work
       // is done.
       try {
         requestClose(
-            "runNioReader caught closed channel");
+            "readMessages caught closed channel");
       } catch (Exception ignore) {
       }
       return; // exit loop and thread
@@ -1741,15 +1647,16 @@ public class Connection implements Runnable {
       if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
         try {
           requestClose(
-              "runNioReader caught shutdown");
+              "readMessages caught shutdown");
         } catch (Exception ignore) {
         }
         return; // bug37520: exit loop (and thread)
       }
-      logger.fatal("Failed setting channel to blocking mode {}", ex);
+      logger.info("Failed initializing socket for message {}: {}",
+          (this.isReceiver ? "receiver" : "sender"), ex.getMessage());
       this.readerShuttingDown = true;
       try {
-        requestClose(String.format("Failed setting channel to blocking mode %s",
+        requestClose(String.format("Failed initializing socket %s",
             ex));
       } catch (Exception ignore) {
       }
@@ -1759,13 +1666,16 @@ public class Connection implements Runnable {
     if (!stopped) {
       // Assert.assertTrue(owner != null, "How did owner become null");
       if (logger.isDebugEnabled()) {
-        logger.debug("Starting {}", p2pReaderName());
+        logger.debug("Starting {} on {}", p2pReaderName(), socket);
       }
     }
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
     // fix for #40869
     boolean isHandShakeReader = false;
+    // if we're using SSL/TLS the input buffer may already have data to process
+    boolean skipInitialRead = getInputBuffer().position() > 0;
+    boolean isInitialRead = true;
     try {
       for (;;) {
         if (stopped) {
@@ -1776,6 +1686,7 @@ public class Connection implements Runnable {
           Socket s = this.socket;
           if (s != null) {
             try {
+              ioFilter.close(s.getChannel());
               s.close();
             } catch (IOException e) {
               // don't care
@@ -1788,18 +1699,28 @@ public class Connection implements Runnable {
         }
 
         try {
-          ByteBuffer buff = getNIOBuffer();
+          ByteBuffer buff = getInputBuffer();
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
-          int amt = channel.read(buff);
+          int amountRead;
+          if (!isInitialRead) {
+            amountRead = channel.read(buff);
+          } else {
+            isInitialRead = false;
+            if (!skipInitialRead) {
+              amountRead = channel.read(buff);
+            } else {
+              amountRead = buff.position();
+            }
+          }
           synchronized (stateLock) {
             connectionState = STATE_IDLE;
           }
-          if (amt == 0) {
+          if (amountRead == 0) {
             continue;
           }
-          if (amt < 0) {
+          if (amountRead < 0) {
             this.readerShuttingDown = true;
             try {
               requestClose("SocketChannel.read returned EOF");
@@ -1811,13 +1732,14 @@ public class Connection implements Runnable {
             return;
           }
 
-          processNIOBuffer();
+          processInputBuffer();
+
           if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
               if (this.handshakeRead) {
-                logger.debug("{} handshake has been read {}", p2pReaderName(), this);
+                logger.debug("handshake has been read {}", this);
               } else {
-                logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this);
+                logger.debug("handshake has been cancelled {}", this);
               }
             }
             isHandShakeReader = true;
@@ -1832,7 +1754,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("CacheClosed in channel read: %s", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         } catch (ClosedChannelException e) {
@@ -1840,7 +1762,7 @@ public class Connection implements Runnable {
           try {
             requestClose(String.format("ClosedChannelException in channel read: %s",
                 e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         } catch (IOException e) {
@@ -1863,7 +1785,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("IOException in channel read: %s", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
 
@@ -1876,7 +1798,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("%s exception in channel read", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         }
@@ -1888,12 +1810,39 @@ public class Connection implements Runnable {
         }
       }
       if (logger.isDebugEnabled()) {
-        logger.debug("{} runNioReader terminated id={} from {}", p2pReaderName(), conduitIdStr,
-            remoteAddr);
+        logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
+            remoteAddr, isHandShakeReader);
       }
     }
   }
 
+  private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
+    if (TCPConduit.useSSL && channel != null) {
+      InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
+      SSLEngine engine =
+          getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort());
+
+      if (!clientSocket) {
+        engine.setWantClientAuth(true);
+        engine.setNeedClientAuth(true);
+      }
+
+      if (inputBuffer == null
+          || (inputBuffer.capacity() < engine.getSession().getPacketBufferSize())) {
+        // TLS has a minimum input buffer size constraint
+        if (inputBuffer != null) {
+          Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats());
+        }
+        inputBuffer = Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(),
+            getConduit().getStats());
+      }
+      ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+          getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats());
+    } else {
+      ioFilter = new NioEngine();
+    }
+  }
+
   /**
    * initiate suspect processing if a shared/ordered connection is lost and we're not shutting down
    */
@@ -1910,7 +1859,7 @@ public class Connection implements Runnable {
    * checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer",
    * or "connection reset"
    */
-  public static boolean isIgnorableIOException(Exception e) {
+  private static boolean isIgnorableIOException(Exception e) {
     if (e instanceof ClosedChannelException) {
       return true;
     }
@@ -1994,465 +1943,9 @@ public class Connection implements Runnable {
     }
   }
 
-  private void runOioReader() {
-    InputStream input = null;
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Socket is of type: {}", getSocket().getClass());
-      }
-      input = new BufferedInputStream(getSocket().getInputStream(), INITIAL_CAPACITY);
-    } catch (IOException io) {
-      if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-        return; // bug 37520: exit run loop (and thread)
-      }
-      logger.fatal("Unable to get input stream", io);
-      stopped = true;
-    }
-
-    if (!stopped) {
-      Assert.assertTrue(owner != null,
-          "owner should not be null");
-      if (logger.isDebugEnabled()) {
-        logger.debug("Starting {}", p2pReaderName());
-      }
-    }
-
-    byte[] headerBytes = new byte[MSG_HEADER_BYTES];
-
-    final ByteArrayDataInput dis = new ByteArrayDataInput();
-    while (!stopped) {
-      try {
-        if (SystemFailure.getFailure() != null) {
-          // Allocate no objects here!
-          Socket s = this.socket;
-          if (s != null) {
-            try {
-              s.close();
-            } catch (IOException e) {
-              // don't care
-            }
-          }
-          SystemFailure.checkFailure(); // throws
-        }
-        if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-          break;
-        }
-        int len = 0;
-        if (readFully(input, headerBytes, headerBytes.length) < 0) {
-          stopped = true;
-          continue;
-        }
-        // long recvNanos = DistributionStats.getStatTime();
-        len = ((headerBytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
-            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
-            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
-            + (headerBytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
-        /* byte msgHdrVersion = */ calcHdrVersion(len);
-        len = calcMsgByteSize(len);
-        int msgType = headerBytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) (((headerBytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
-            + (headerBytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
-        boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
-        if (myDirectAck) {
-          msgType &= ~DIRECT_ACK_BIT; // clear the bit
-        }
-        // Following validation fixes bug 31145
-        if (!validMsgType(msgType)) {
-          logger.fatal("Unknown P2P message type: {}", Integer.valueOf(msgType));
-          this.readerShuttingDown = true;
-          requestClose(String.format("Unknown P2P message type: %s",
-              Integer.valueOf(msgType)));
-          break;
-        }
-        if (logger.isTraceEnabled())
-          logger.trace("{} reading {} bytes", conduitIdStr, len);
-        byte[] bytes = new byte[len];
-        if (readFully(input, bytes, len) < 0) {
-          stopped = true;
-          continue;
-        }
-        boolean interrupted = Thread.interrupted();
-        try {
-          if (this.handshakeRead) {
-            if (msgType == NORMAL_MSG_TYPE) {
-              // DMStats stats = this.owner.getConduit().stats;
-              // long start = DistributionStats.getStatTime();
-              this.owner.getConduit().getStats().incMessagesBeingReceived(true, len);
-              dis.initialize(bytes, this.remoteVersion);
-              DistributionMessage msg = null;
-              try {
-                ReplyProcessor21.initMessageRPId();
-                long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
-                msg = (DistributionMessage) InternalDataSerializer.readDSFID(dis);
-                this.owner.getConduit().getStats().endMsgDeserialization(startSer);
-                if (dis.available() != 0) {
-                  logger.warn("Message deserialization of {} did not read {} bytes.",
-                      msg, Integer.valueOf(dis.available()));
-                }
-                // stats.incBatchCopyTime(start);
-                try {
-                  // start = DistributionStats.getStatTime();
-                  if (!dispatchMessage(msg, len, myDirectAck)) {
-                    continue;
-                  }
-                  // stats.incBatchSendTime(start);
-                } catch (MemberShunnedException e) {
-                  continue;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); // bug
-                                                                                          // 37101
-                  logger.fatal("Error dispatching message", de);
-                }
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable e) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                // In particular I want OutOfMem to be caught here
-                if (!myDirectAck) {
-                  String reason =
-                      "Error deserializing message";
-                  sendFailureReply(ReplyProcessor21.getMessageRPId(), reason, e, myDirectAck);
-                }
-                if (e instanceof CancelException) {
-                  if (!(e instanceof CacheClosedException)) {
-                    // Just log a message if we had trouble deserializing due to
-                    // CacheClosedException; see bug 43543
-                    throw (CancelException) e;
-                  }
-                }
-                logger.fatal("Error deserializing message", e);
-                // requestClose();
-                // return;
-              } finally {
-                ReplyProcessor21.clearMessageRPId();
-              }
-            } else if (msgType == CHUNKED_MSG_TYPE) {
-              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
-              try {
-                md.addChunk(bytes);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling chunk message", ex);
-              }
-            } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
-              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
-              try {
-                md.addChunk(bytes);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling end chunk message", ex);
-              }
-              DistributionMessage msg = null;
-              int msgLength = 0;
-              String failureMsg = null;
-              Throwable failureEx = null;
-              int rpId = 0;
-              try {
-                msg = md.getMessage();
-              } catch (ClassNotFoundException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.warn("ClassNotFound deserializing message: {}", ex.toString());
-              } catch (IOException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "IOException deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("IOException deserializing message", failureEx);
-              } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-                throw ex; // caught by outer try
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable ex) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "Unexpected failure deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("Unexpected failure deserializing message",
-                    failureEx);
-              } finally {
-                msgLength = md.size();
-                releaseMsgDestreamer(msgId, md);
-              }
-              if (msg != null) {
-                try {
-                  if (!dispatchMessage(msg, msgLength, myDirectAck)) {
-                    continue;
-                  }
-                } catch (MemberShunnedException e) {
-                  continue;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
-                }
-              } else if (failureEx != null) {
-                sendFailureReply(rpId, failureMsg, failureEx, myDirectAck);
-              }
-            }
-          } else {
-            dis.initialize(bytes, null);
-            if (!this.isReceiver) {
-              this.replyCode = dis.readUnsignedByte();
-              if (this.replyCode != REPLY_CODE_OK
-                  && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                Integer replyCodeInteger = Integer.valueOf(this.replyCode);
-                String err = String.format("Unknown handshake reply code: %s",
-                    replyCodeInteger);
-
-                if (this.replyCode == 0) { // bug 37113
-                  if (logger.isDebugEnabled()) {
-                    logger.debug("{} (peer probably departed ungracefully)", err);
-                  }
-                } else {
-                  logger.fatal("Unknown handshake reply code: {}",
-                      replyCodeInteger);
-                }
-                this.readerShuttingDown = true;
-                requestClose(err);
-                break;
-              }
-              if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                this.asyncDistributionTimeout = dis.readInt();
-                this.asyncQueueTimeout = dis.readInt();
-                this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-                if (this.asyncDistributionTimeout != 0) {
-                  logger.info("{} async configuration received {}.",
-                      p2pReaderName(),
-                      " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                          + " asyncQueueTimeout=" + this.asyncQueueTimeout
-                          + " asyncMaxQueueSize="
-                          + (this.asyncMaxQueueSize / (1024 * 1024)));
-                }
-                // read the product version ordinal for on-the-fly serialization
-                // transformations (for rolling upgrades)
-                this.remoteVersion = Version.readVersion(dis, true);
-              }
-              notifyHandshakeWaiter(true);
-            } else {
-              byte b = dis.readByte();
-              if (b != 0) {
-                throw new IllegalStateException(
-                    String.format(
-                        "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
-                        new Byte(b)));
-              }
-              byte handshakeByte = dis.readByte();
-              if (handshakeByte != HANDSHAKE_VERSION) {
-                throw new IllegalStateException(
-                    String.format(
-                        "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-
-                        new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
-              }
-              InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-              setRemoteAddr(remote);
-              Thread.currentThread().setName(String.format("P2P message reader for %s on port %s",
-                  this.remoteAddr, this.socket.getPort()));
-              this.sharedResource = dis.readBoolean();
-              this.preserveOrder = dis.readBoolean();
-              this.uniqueId = dis.readLong();
-              // read the product version ordinal for on-the-fly serialization
-              // transformations (for rolling upgrades)
-              this.remoteVersion = Version.readVersion(dis, true);
-              int dominoNumber = 0;
-              if (this.remoteVersion == null
-                  || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
-                dominoNumber = dis.readInt();
-                if (this.sharedResource) {
-                  dominoNumber = 0;
-                }
-                dominoCount.set(dominoNumber);
-                // this.senderName = dis.readUTF();
-                setThreadName(dominoNumber);
-              }
-
-              if (!this.sharedResource) {
-                if (tipDomino()) {
-                  logger
-                      .info("thread owned receiver forcing itself to send on thread owned sockets");
-                  // bug #49565 - if domino count is >= 2 use shared resources.
-                  // Also see DistributedCacheOperation#supportsDirectAck
-                } else { // if (dominoNumber < 2){
-                  ConnectionTable.threadWantsOwnResources();
-                  if (logger.isDebugEnabled()) {
-                    logger.debug(
-                        "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
-                        dominoNumber);
-                  }
-                  // } else {
-                  // ConnectionTable.threadWantsSharedResources();
-                  // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
-                  // will prefer shared sockets");
-                }
-                this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
-              }
-
-              if (logger.isDebugEnabled()) {
-                logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr,
-                    (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
-              }
-
-              String authInit = System.getProperty(
-                  DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
-              boolean isSecure = authInit != null && authInit.length() != 0;
-
-              if (isSecure) {
-                // ARB: wait till member authentication has been confirmed?
-                if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
-                  sendOKHandshakeReply(); // fix for bug 33224
-                  notifyHandshakeWaiter(true);
-                } else {
-                  // ARB: throw exception??
-                  notifyHandshakeWaiter(false);
-                  logger.warn("{} timed out during a membership check.",
-                      p2pReaderName());
-                }
-              } else {
-                sendOKHandshakeReply(); // fix for bug 33224
-                notifyHandshakeWaiter(true);
-              }
-            }
-            if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
-              if (logger.isDebugEnabled()) {
-                if (this.handshakeRead) {
-                  logger.debug("{} handshake has been read {}", p2pReaderName(), this);
-                } else {
-                  logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this);
-                }
-              }
-              // Once we have read the handshake the reader can go away
-              break;
-            }
-            continue;
-          }
-        } catch (InterruptedException e) {
-          interrupted = true;
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
-          logger.fatal(String.format("%s Stray interrupt reading message", p2pReaderName()), e);
-          continue;
-        } catch (Exception ioe) {
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ioe); // bug 37101
-          if (!stopped) {
-            logger.fatal(String.format("%s Error reading message", p2pReaderName()), ioe);
-          }
-          continue;
-        } finally {
-          if (interrupted) {
-            Thread.currentThread().interrupt();
-          }
-        }
-      } catch (CancelException e) {
-        if (logger.isDebugEnabled()) {
-          String ccMsg = p2pReaderName() + " Cancelled: " + this;
-          if (e.getMessage() != null) {
-            ccMsg += ": " + e.getMessage();
-          }
-          logger.debug(ccMsg);
-        }
-        this.readerShuttingDown = true;
-        try {
-          requestClose(
-              String.format("CacheClosed in channel read: %s", e));
-        } catch (Exception ex) {
-        }
-        this.stopped = true;
-      } catch (IOException io) {
-        boolean closed = isSocketClosed() || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed
-                                                                                                // for
-                                                                                                // Solaris
-                                                                                                // jdk
-                                                                                                // 1.4.2_08
-        if (!closed) {
-          if (logger.isDebugEnabled() && !isIgnorableIOException(io)) {
-            logger.debug("{} io exception for {}", p2pReaderName(), this, io);
-          }
-        }
-        this.readerShuttingDown = true;
-        try {
-          requestClose(String.format("IOException received: %s", io));
-        } catch (Exception ex) {
-        }
-
-        if (closed) {
-          stopped = true;
-        } else {
-          // sleep a bit to avoid a hot error loop
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-              return;
-            }
-            break;
-          }
-        }
-      } // IOException
-      catch (Exception e) {
-        if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-          return; // bug 37101
-        }
-        if (!stopped && !(e instanceof InterruptedException)) {
-          logger.fatal(String.format("%s exception received",
-              p2pReaderName()), e);
-        }
-        if (isSocketClosed()) {
-          stopped = true;
-        } else {
-          this.readerShuttingDown = true;
-          try {
-            requestClose(String.format("%s exception received", e));
-          } catch (Exception ex) {
-          }
-
-          // sleep a bit to avoid a hot error loop
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            break;
-          }
-        }
-      }
-    }
-  }
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE")
-  int readFully(InputStream input, byte[] buffer, int len) throws IOException {
+  void readFully(InputStream input, byte[] buffer, int len) throws IOException {
     int bytesSoFar = 0;
     while (bytesSoFar < len) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2465,9 +1958,9 @@ public class Connection implements Runnable {
           this.readerShuttingDown = true;
           try {
             requestClose("Stream read returned non-positive length");
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
-          return -1;
+          return;
         }
         bytesSoFar += bytesThisTime;
       } catch (InterruptedIOException io) {
@@ -2475,7 +1968,7 @@ public class Connection implements Runnable {
         this.readerShuttingDown = true;
         try {
           requestClose("Current thread interrupted");
-        } catch (Exception ex) {
+        } catch (Exception ignored) {
         }
         Thread.currentThread().interrupt();
         this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2485,7 +1978,6 @@ public class Connection implements Runnable {
         }
       }
     } // while
-    return len;
   }
 
   /**
@@ -2494,7 +1986,7 @@ public class Connection implements Runnable {
    *
    * @throws ConnectionException if the conduit has stopped
    */
-  public void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
+  void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
       DistributionMessage msg) throws IOException, ConnectionException {
     if (!connected) {
       throw new ConnectionException(
@@ -2512,21 +2004,8 @@ public class Connection implements Runnable {
     }
     this.socketInUse = true;
     try {
-      if (useNIO()) {
-        SocketChannel channel = getSocket().getChannel();
-        nioWriteFully(channel, buffer, false, msg);
-      } else {
-        if (buffer.hasArray()) {
-          this.output.write(buffer.array(), buffer.arrayOffset(),
-              buffer.limit() - buffer.position());
-        } else {
-          byte[] bytesToWrite = getBytesToWrite(buffer);
-          synchronized (outLock) {
-            this.output.write(bytesToWrite);
-            this.output.flush();
-          }
-        }
-      }
+      SocketChannel channel = getSocket().getChannel();
+      writeFully(channel, buffer, false, msg);
       if (cacheContentChanges) {
         messagesSent++;
       }
@@ -2577,7 +2056,7 @@ public class Connection implements Runnable {
   /**
    * For testing we want to configure the connection without having to read a handshake
    */
-  protected void setSharedUnorderedForTest() {
+  void setSharedUnorderedForTest() {
     this.preserveOrder = false;
     this.sharedResource = true;
     this.handshakeRead = true;
@@ -2585,7 +2064,7 @@ public class Connection implements Runnable {
 
 
   /** ensure that a task is running to monitor transmission and reading of acks */
-  public synchronized void scheduleAckTimeouts() {
+  synchronized void scheduleAckTimeouts() {
     if (ackTimeoutTask == null) {
       final long msAW = this.owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
       final long msSA = this.owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
@@ -2649,11 +2128,11 @@ public class Connection implements Runnable {
   }
 
   /** ack-wait-threshold and ack-severe-alert-threshold processing */
-  protected boolean doSevereAlertProcessing() {
+  private boolean doSevereAlertProcessing() {
     long now = System.currentTimeMillis();
     if (ackSATimeout > 0 && (transmissionStartTime + ackWaitTimeout + ackSATimeout) <= now) {
       logger.fatal("{} seconds have elapsed waiting for a response from {} for thread {}",
-          Long.valueOf((ackWaitTimeout + ackSATimeout) / 1000),
+          (ackWaitTimeout + ackSATimeout) / 1000L,
           getRemoteAddress(),
           ackThreadName);
       // turn off subsequent checks by setting the timeout to zero, then boot the member
@@ -2662,7 +2141,7 @@ public class Connection implements Runnable {
     } else if (!ackTimedOut && (0 < ackWaitTimeout)
         && (transmissionStartTime + ackWaitTimeout) <= now) {
       logger.warn("{} seconds have elapsed waiting for a response from {} for thread {}",
-          Long.valueOf(ackWaitTimeout / 1000), getRemoteAddress(), ackThreadName);
+          ackWaitTimeout / 1000L, getRemoteAddress(), ackThreadName);
       ackTimedOut = true;
 
       final String state = (connectionState == Connection.STATE_SENDING)
@@ -2676,12 +2155,6 @@ public class Connection implements Runnable {
     return false;
   }
 
-  private static byte[] getBytesToWrite(ByteBuffer buffer) {
-    byte[] bytesToWrite = new byte[buffer.limit()];
-    buffer.get(bytesToWrite);
-    return bytesToWrite;
-  }
-
   private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
       throws ConnectionException {
     final DMStats stats = this.owner.getConduit().getStats();
@@ -2806,20 +2279,20 @@ public class Connection implements Runnable {
     if (!addToQueue(buffer, msg, true)) {
       return false;
     } else {
-      startNioPusher();
+      startMessagePusher();
       return true;
     }
   }
 
-  private final Object nioPusherSync = new Object();
+  private final Object pusherSync = new Object();
 
-  private void startNioPusher() {
-    synchronized (this.nioPusherSync) {
+  private void startMessagePusher() {
+    synchronized (this.pusherSync) {
       while (this.pusherThread != null) {
         // wait for previous pusher thread to exit
         boolean interrupted = Thread.interrupted();
         try {
-          this.nioPusherSync.wait(); // spurious wakeup ok
+          this.pusherSync.wait(); // spurious wakeup ok
         } catch (InterruptedException ex) {
           interrupted = true;
           this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
@@ -2831,7 +2304,7 @@ public class Connection implements Runnable {
       }
       this.asyncQueuingInProgress = true;
       this.pusherThread =
-          new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runNioPusher);
+          new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runMessagePusher);
     } // synchronized
     this.pusherThread.start();
   }
@@ -2937,7 +2410,7 @@ public class Connection implements Runnable {
   /**
    * have the pusher thread check for queue overflow and for idle time exceeded
    */
-  protected void runNioPusher() {
+  private void runMessagePusher() {
     try {
       final DMStats stats = this.owner.getConduit().getStats();
       final long threadStart = stats.startAsyncThread();
@@ -2953,6 +2426,8 @@ public class Connection implements Runnable {
               Socket s = this.socket;
               if (s != null) {
                 try {
+                  logger.debug("closing socket", new Exception("closing socket"));
+                  ioFilter.close(s.getChannel());
                   s.close();
                 } catch (IOException e) {
                   // don't care
@@ -2983,7 +2458,7 @@ public class Connection implements Runnable {
                 }
                 return;
               }
-              nioWriteFully(channel, bb, true, null);
+              writeFully(channel, bb, true, null);
               // We should not add messagesSent here according to Bruce.
               // The counts are increased elsewhere.
               // messagesSent++;
@@ -3017,7 +2492,7 @@ public class Connection implements Runnable {
         }
       } catch (CancelException ex) { // bug 37367
         final String err = String.format("P2P pusher %s caught CacheClosedException: %s",
-            new Object[] {this, ex});
+            this, ex);
         logger.debug(err);
         try {
           requestClose(err);
@@ -3040,14 +2515,14 @@ public class Connection implements Runnable {
         stats.incAsyncThreads(-1);
         stats.incAsyncQueues(-1);
         if (logger.isDebugEnabled()) {
-          logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr,
+          logger.debug("runMessagePusher terminated id={} from {}/{}", conduitIdStr, remoteAddr,
               remoteAddr);
         }
       }
     } finally {
-      synchronized (this.nioPusherSync) {
+      synchronized (this.pusherSync) {
         this.pusherThread = null;
-        this.nioPusherSync.notify();
+        this.pusherSync.notify();
       }
     }
   }
@@ -3121,6 +2596,7 @@ public class Connection implements Runnable {
         long queueTimeoutTarget = now + this.asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
+          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
           do {
             this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
             retries++;
@@ -3128,7 +2604,7 @@ public class Connection implements Runnable {
             if (FORCE_ASYNC_QUEUE) {
               amtWritten = 0;
             } else {
-              amtWritten = channel.write(buffer);
+              amtWritten = channel.write(wrappedBuffer);
             }
             if (amtWritten == 0) {
               now = System.currentTimeMillis();
@@ -3155,7 +2631,7 @@ public class Connection implements Runnable {
                     // the partial msg a candidate for conflation.
                     msg = null;
                   }
-                  if (handleBlockedWrite(buffer, msg)) {
+                  if (handleBlockedWrite(wrappedBuffer, msg)) {
                     return;
                   }
                 }
@@ -3166,8 +2642,8 @@ public class Connection implements Runnable {
                 if (curQueuedBytes > this.asyncMaxQueueSize) {
                   logger.warn(
                       "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                      Long.valueOf(curQueuedBytes),
-                      Long.valueOf(this.asyncMaxQueueSize), this.remoteAddr);
+                      curQueuedBytes,
+                      this.asyncMaxQueueSize, this.remoteAddr);
                   stats.incAsyncQueueSizeExceeded(1);
                   disconnectNeeded = true;
                 }
@@ -3178,8 +2654,8 @@ public class Connection implements Runnable {
                   blockedMs += this.asyncQueueTimeout;
                   logger.warn(
                       "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
-                      Long.valueOf(blockedMs),
-                      Integer.valueOf(this.asyncQueueTimeout), this.remoteAddr);
+                      blockedMs,
+                      this.asyncQueueTimeout, this.remoteAddr);
                   stats.incAsyncQueueTimeouts(1);
                   disconnectNeeded = true;
                 }
@@ -3229,7 +2705,7 @@ public class Connection implements Runnable {
               queueTimeoutTarget = System.currentTimeMillis() + this.asyncQueueTimeout;
               waitTime = 1;
             }
-          } while (buffer.remaining() > 0);
+          } while (wrappedBuffer.remaining() > 0);
         } finally {
           channel.configureBlocking(true);
         }
@@ -3245,12 +2721,12 @@ public class Connection implements Runnable {
   }
 
   /**
-   * nioWriteFully implements a blocking write on a channel that is in non-blocking mode.
+   * writeFully implements a blocking write on a channel that is in non-blocking mode.
    *
    * @param forceAsync true if we need to force a blocking async write.
    * @throws ConnectionException if the conduit has stopped
    */
-  protected void nioWriteFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
+  void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage msg) throws IOException, ConnectionException {
     final DMStats stats = this.owner.getConduit().getStats();
     if (!this.sharedResource) {
@@ -3272,17 +2748,20 @@ public class Connection implements Runnable {
           }
           // fall through
         }
+        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+
         do {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
             // this.writerThread = Thread.currentThread();
-            amtWritten = channel.write(buffer);
+            amtWritten = channel.write(wrappedBuffer);
           } finally {
             stats.endSocketWrite(true, start, amtWritten, 0);
             // this.writerThread = null;
           }
-        } while (buffer.remaining() > 0);
+        } while (wrappedBuffer.remaining() > 0);
+
       } // synchronized
     } else {
       writeAsync(channel, buffer, forceAsync, msg, stats);
@@ -3290,16 +2769,15 @@ public class Connection implements Runnable {
   }
 
   /** gets the buffer for receiving message length bytes */
-  protected ByteBuffer getNIOBuffer() {
-    final DMStats stats = this.owner.getConduit().getStats();
-    if (nioInputBuffer == null) {
+  private ByteBuffer getInputBuffer() {
+    if (inputBuffer == null) {
       int allocSize = this.recvBufferSize;
       if (allocSize == -1) {
         allocSize = this.owner.getConduit().tcpBufferSize;
       }
-      nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, this.owner.getConduit().getStats());
     }
-    return nioInputBuffer;
+    return inputBuffer;
   }
 
   /**
@@ -3312,30 +2790,28 @@ public class Connection implements Runnable {
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
-  protected static final byte STATE_IDLE = 0;
+  private static final byte STATE_IDLE = 0;
   /** the connection is in use and is transmitting data */
-  protected static final byte STATE_SENDING = 1;
+  private static final byte STATE_SENDING = 1;
   /** the connection is in use and is done transmitting */
-  protected static final byte STATE_POST_SENDING = 2;
+  private static final byte STATE_POST_SENDING = 2;
   /** the connection is in use and is reading a direct-ack */
-  protected static final byte STATE_READING_ACK = 3;
+  private static final byte STATE_READING_ACK = 3;
   /** the connection is in use and has finished reading a direct-ack */
-  protected static final byte STATE_RECEIVED_ACK = 4;
+  private static final byte STATE_RECEIVED_ACK = 4;
   /** the connection is in use and is reading a message */
-  protected static final byte STATE_READING = 5;
+  private static final byte STATE_READING = 5;
   /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 
   /** set to true if we exceeded the ack-wait-threshold waiting for a response */
-  protected volatile boolean ackTimedOut;
+  private volatile boolean ackTimedOut;
 
   /**
-   * @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever.
-   * @param msInterval interval between checks
-   * @throws SocketTimeoutException if msToWait expires.
-   * @throws ConnectionException if ack is not received (fixes bug 34312)
+   * @throws SocketTimeoutException if wait expires.
+   * @throws ConnectionException if ack is not received
    */
-  public void readAck(final int msToWait, final long msInterval,
-      final DirectReplyProcessor processor) throws SocketTimeoutException, ConnectionException {
+  public void readAck(final DirectReplyProcessor processor)
+      throws SocketTimeoutException, ConnectionException {
     if (isSocketClosed()) {
       throw new ConnectionException(
           "connection is closed");
@@ -3350,28 +2826,24 @@ public class Connection implements Runnable {
     DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
     try {
-      if (useNIO()) {
-        msgReader = new NIOMsgReader(this, version);
-      } else {
-        msgReader = new OioMsgReader(this, version);
-      }
+      msgReader = new MsgReader(this, ioFilter, getInputBuffer(), version);
 
       Header header = msgReader.readHeader();
 
       ReplyMessage msg;
       int len;
-      if (header.getNioMessageType() == NORMAL_MSG_TYPE) {
+      if (header.getMessageType() == NORMAL_MSG_TYPE) {
         msg = (ReplyMessage) msgReader.readMessage(header);
-        len = header.getNioMessageLength();
+        len = header.getMessageLength();
       } else {
-        MsgDestreamer destreamer = obtainMsgDestreamer(header.getNioMessageId(), version);
-        while (header.getNioMessageType() == CHUNKED_MSG_TYPE) {
+        MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
+        while (header.getMessageType() == CHUNKED_MSG_TYPE) {
           msgReader.readChunk(header, destreamer);
           header = msgReader.readHeader();
         }
         msgReader.readChunk(header, destreamer);
         msg = (ReplyMessage) destreamer.getMessage();
-        releaseMsgDestreamer(header.getNioMessageId(), destreamer);
+        releaseMsgDestreamer(header.getMessageId(), destreamer);
         len = destreamer.size();
       }
       // I'd really just like to call dispatchMessage here. However,
@@ -3442,391 +2914,48 @@ public class Connection implements Runnable {
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
    */
-  private void processNIOBuffer() throws ConnectionException, IOException {
-    if (nioInputBuffer != null) {
-      nioInputBuffer.flip();
-    }
+  private void processInputBuffer() throws ConnectionException, IOException {
+    inputBuffer.flip();
+
+    ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+    peerDataBuffer.flip();
+
     boolean done = false;
 
     while (!done && connected) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
       // long startTime = DistributionStats.getStatTime();
-      int remaining = nioInputBuffer.remaining();
-      if (nioLengthSet || remaining >= MSG_HEADER_BYTES) {
-        if (!nioLengthSet) {
-          int headerStartPos = nioInputBuffer.position();
-          nioMessageLength = nioInputBuffer.getInt();
-          /* nioMessageVersion = */ calcHdrVersion(nioMessageLength);
-          nioMessageLength = calcMsgByteSize(nioMessageLength);
-          nioMessageType = nioInputBuffer.get();
-          nioMsgId = nioInputBuffer.getShort();
-          directAck = (nioMessageType & DIRECT_ACK_BIT) != 0;
-          if (directAck) {
-            nioMessageType &= ~DIRECT_ACK_BIT; // clear the ack bit
-          }
-          // Following validation fixes bug 31145
-          if (!validMsgType(nioMessageType)) {
-            Integer nioMessageTypeInteger = Integer.valueOf(nioMessageType);
-            logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
-            this.readerShuttingDown = true;
-            requestClose(String.format("Unknown P2P message type: %s",
-                nioMessageTypeInteger));
+      int remaining = peerDataBuffer.remaining();
+      if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+        if (!lengthSet) {
+          if (readMessageHeader(peerDataBuffer)) {
             break;
           }
-          nioLengthSet = true;
-          // keep the header "in" the buffer until we have read the entire msg.
-          // Trust me: this will reduce copying on large messages.
-          nioInputBuffer.position(headerStartPos);
-        }
-        if (remaining >= nioMessageLength + MSG_HEADER_BYTES) {
-          nioLengthSet = false;
-          nioInputBuffer.position(nioInputBuffer.position() + MSG_HEADER_BYTES);
+        }
+        if (remaining >= messageLength + MSG_HEADER_BYTES) {
+          lengthSet = false;
+          peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
           // don't trust the message deserialization to leave the position in
           // the correct spot. Some of the serialization uses buffered
           // streams that can leave the position at the wrong spot
-          int startPos = nioInputBuffer.position();
-          int oldLimit = nioInputBuffer.limit();
-          nioInputBuffer.limit(startPos + nioMessageLength);
+          int startPos = peerDataBuffer.position();
+          int oldLimit = peerDataBuffer.limit();
+          peerDataBuffer.limit(startPos + messageLength);
+
           if (this.handshakeRead) {
-            if (nioMessageType == NORMAL_MSG_TYPE) {
-              this.owner.getConduit().getStats().incMessagesBeingReceived(true, nioMessageLength);
-              ByteBufferInputStream bbis =
-                  remoteVersion == null ? new ByteBufferInputStream(nioInputBuffer)
-                      : new VersionedByteBufferInputStream(nioInputBuffer, remoteVersion);
-              DistributionMessage msg = null;
-              try {
-                ReplyProcessor21.initMessageRPId();
-                // add serialization stats
-                long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
-                msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
-                this.owner.getConduit().getStats().endMsgDeserialization(startSer);
-                if (bbis.available() != 0) {
-                  logger.warn("Message deserialization of {} did not read {} bytes.",
-                      msg, Integer.valueOf(bbis.available()));
-                }
-                try {
-                  if (!dispatchMessage(msg, nioMessageLength, directAck)) {
-                    directAck = false;
-                  }
-                } catch (MemberShunnedException e) {
-                  directAck = false; // don't respond (bug39117)
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
-                }
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable t) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                sendFailureReply(ReplyProcessor21.getMessageRPId(),
-                    "Error deserializing message", t,
-                    directAck);
-                if (t instanceof ThreadDeath) {
-                  throw (ThreadDeath) t;
-                }
-                if (t instanceof CancelException) {
-                  if (!(t instanceof CacheClosedException)) {
-                    // Just log a message if we had trouble deserializing due to
-                    // CacheClosedException; see bug 43543
-                    throw (CancelException) t;
-                  }
-                }
-                logger.fatal("Error deserializing message", t);
-              } finally {
-                ReplyProcessor21.clearMessageRPId();
-              }
-            } else if (nioMessageType == CHUNKED_MSG_TYPE) {
-              MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
-                  nioMessageLength);
-              try {
-                md.addChunk(nioInputBuffer, nioMessageLength);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling chunk message", ex);
-              }
-            } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
-              // logger.info("END_CHUNK msgId="+nioMsgId);
-              MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
-                  nioMessageLength);
-              try {
-                md.addChunk(nioInputBuffer, nioMessageLength);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling end chunk message", ex);
-              }
-              DistributionMessage msg = null;
-              int msgLength = 0;
-              String failureMsg = null;
-              Throwable failureEx = null;
-              int rpId = 0;
-              boolean interrupted = false;
-              try {
-                msg = md.getMessage();
-              } catch (ClassNotFoundException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "ClassNotFound deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
-              } catch (IOException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "IOException deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("IOException deserializing message", failureEx);
-              } catch (InterruptedException ex) {
-                interrupted = true;
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable ex) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "Unexpected failure deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("Unexpected failure deserializing message",
-                    failureEx);
-              } finally {
-                msgLength = md.size();
-                releaseMsgDestreamer(nioMsgId, md);
-                if (interrupted) {
-                  Thread.currentThread().interrupt();
-                }
-              }
-              if (msg != null) {
-                try {
-                  if (!dispatchMessage(msg, msgLength, directAck)) {
-                    directAck = false;
-                  }
-                } catch (MemberShunnedException e) {
-                  // not a member anymore - don't reply
-                  directAck = false;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
-                }
-              } else if (failureEx != null) {
-                sendFailureReply(rpId, failureMsg, failureEx, directAck);
-              }
-            }
+            readMessage(peerDataBuffer);
+
           } else {
-            // read HANDSHAKE
-            ByteBufferInputStream bbis = new ByteBufferInputStream(nioInputBuffer);
+            ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
             DataInputStream dis = new DataInputStream(bbis);
             if (!this.isReceiver) {
-              try {
-                this.replyCode = dis.readUnsignedByte();
-                if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                  this.asyncDistributionTimeout = dis.readInt();
-                  this.asyncQueueTimeout = dis.readInt();
-                  this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-                  if (this.asyncDistributionTimeout != 0) {
-                    logger.info("{} async configuration received {}.",
-                        p2pReaderName(),
-                        " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                            + " asyncQueueTimeout=" + this.asyncQueueTimeout
-                            + " asyncMaxQueueSize="
-                            + (this.asyncMaxQueueSize / (1024 * 1024)));
-                  }
-                  // read the product version ordinal for on-the-fly serialization
-                  // transformations (for rolling upgrades)
-                  this.remoteVersion = Version.readVersion(dis, true);
-                }
-              } catch (Exception e) {
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
-                logger.fatal("Error deserializing P2P handshake reply", e);
-                this.readerShuttingDown = true;
-                requestClose("Error deserializing P2P handshake reply");
-                return;
-              } catch (ThreadDeath td) {
-                throw td;
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable t) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                logger.fatal("Throwable deserializing P2P handshake reply",
-                    t);
-                this.readerShuttingDown = true;
-                requestClose("Throwable deserializing P2P handshake reply");
-                return;
-              }
-              if (this.replyCode != REPLY_CODE_OK
-                  && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                String err =
-                    "Unknown handshake reply code: %s nioMessageLength: %s";
-                Object[] errArgs = new Object[] {Integer.valueOf(this.replyCode),
-                    Integer.valueOf(nioMessageLength)};
-                if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
-                  logger.debug(
-                      String.format(err, errArgs) + " (peer probably departed ungracefully)");
-                } else {
-                  logger.fatal(err, errArgs);
-                }
-                this.readerShuttingDown = true;
-                requestClose(String.format(err, errArgs));
+              if (readHandshakeForSender(dis)) {
+                ioFilter.doneReading(peerDataBuffer);
                 return;
               }
-              notifyHandshakeWaiter(true);
             } else {
-              try {
-                byte b = dis.readByte();
-                if (b != 0) {
-                  throw new IllegalStateException(
-                      String.format(
-                          "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
-                          new Byte(b)));
-                }
-                byte handshakeByte = dis.readByte();
-                if (handshakeByte != HANDSHAKE_VERSION) {
-                  throw new IllegalStateException(
-                      String.format(
-                          "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-
-                          new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
-                }
-                InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-                setRemoteAddr(remote);
-                this.sharedResource = dis.readBoolean();
-                this.preserveOrder = dis.readBoolean();
-                this.uniqueId = dis.readLong();
-                // read the product version ordinal for on-the-fly serialization
-                // transformations (for rolling upgrades)
-                this.remoteVersion = Version.readVersion(dis, true);
-                int dominoNumber = 0;
-                if (this.remoteVersion == null
-                    || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
-                  dominoNumber = dis.readInt();
-                  if (this.sharedResource) {
-                    dominoNumber = 0;
-                  }
-                  dominoCount.set(dominoNumber);
-                  // this.senderName = dis.readUTF();
-                }
-                if (!this.sharedResource) {
-                  if (tipDomino()) {
-                    logger.info(
-                        "thread owned receiver forcing itself to send on thread owned sockets");
-                    // bug #49565 - if domino count is >= 2 use shared resources.
-                    // Also see DistributedCacheOperation#supportsDirectAck
-                  } else { // if (dominoNumber < 2) {
-                    ConnectionTable.threadWantsOwnResources();
-                    if (logger.isDebugEnabled()) {
-                      logger.debug(
-                          "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
-                          dominoNumber);
-                    }
-                    // } else {
-                    // ConnectionTable.threadWantsSharedResources();
-                  }
-                  this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
-                  // Because this thread is not shared resource, it will be used for direct
-                  // ack. Direct ack messages can be large. This call will resize the send
-                  // buffer.
-                  setSendBufferSize(this.socket);
-                }
-                // String name = owner.getDM().getConfig().getName();
-                // if (name == null) {
-                // name = "pid="+OSProcess.getId();
-                // }
-                setThreadName(dominoNumber);
-              } catch (Exception e) {
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
-                logger.fatal("Error deserializing P2P handshake message", e);
-                this.readerShuttingDown = true;
-                requestClose("Error deserializing P2P handshake message");
-                return;
-              }
-              if (logger.isDebugEnabled()) {
-                logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
-                    (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
-              }
-              try {
-                String authInit = System.getProperty(
-                    DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
-                boolean isSecure = authInit != null && authInit.length() != 0;
-
-                if (isSecure) {
-                  if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
-                    sendOKHandshakeReply(); // fix for bug 33224
-                    notifyHandshakeWaiter(true);
-                  } else {
-                    // ARB: check if we need notifyHandshakeWaiter() call.
-                    notifyHandshakeWaiter(false);
-                    logger.warn("{} timed out during a membership check.",
-                        p2pReaderName());
-                    return;
-                  }
-                } else {
-                  sendOKHandshakeReply(); // fix for bug 33224
-                  try {
-                    notifyHandshakeWaiter(true);
-                  } catch (Exception e) {
-                    logger.fatal("Uncaught exception from listener", e);
-                  }
-                }
-              } catch (IOException ex) {
-                final String err = "Failed sending handshake reply";
-                if (logger.isDebugEnabled()) {
-                  logger.debug(err, ex);
-                }
-                this.readerShuttingDown = true;
-                requestClose(err + ": " + ex);
+              if (readHandshakeForReceiver(dis)) {
+                ioFilter.doneReading(peerDataBuffer);
                 return;
               }
             }
@@ -3835,22 +2964,394 @@ public class Connection implements Runnable {
             continue;
           }
           accessed();
-          nioInputBuffer.limit(oldLimit);
-          nioInputBuffer.position(startPos + nioMessageLength);
+          peerDataBuffer.limit(oldLimit);
+          peerDataBuffer.position(startPos + messageLength);
         } else {
           done = true;
-          compactOrResizeBuffer(nioMessageLength);
+          if (TCPConduit.useSSL) {
+            ioFilter.doneReading(peerDataBuffer);
+          } else {
+            compactOrResizeBuffer(messageLength);
+          }
         }
       } else {
+        ioFilter.doneReading(peerDataBuffer);
         done = true;
-        if (nioInputBuffer.position() != 0) {
-          nioInputBuffer.compact();
+      }
+    }
+  }
+
+  private boolean readHandshakeForReceiver(DataInputStream dis) {
+    try {
+      byte b = dis.readByte();
+      if (b != 0) {
+        throw new IllegalStateException(
+            String.format(
+                "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
+                b));
+      }
+      byte handshakeByte = dis.readByte();
+      if (handshakeByte != HANDSHAKE_VERSION) {
+        throw new IllegalStateException(
+            String.format(
+                "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
+                HANDSHAKE_VERSION, handshakeByte));
+      }
+      InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
+      setRemoteAddr(remote);
+      this.sharedResource = dis.readBoolean();
+      this.preserveOrder = dis.readBoolean();
+      this.uniqueId = dis.readLong();
+      // read the product version ordinal for on-the-fly serialization
+      // transformations (for rolling upgrades)
+      this.remoteVersion = Version.readVersion(dis, true);
+      int dominoNumber = 0;
+      if (this.remoteVersion == null
+          || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
+        dominoNumber = dis.readInt();
+        if (this.sharedResource) {
+          dominoNumber = 0;
+        }
+        dominoCount.set(dominoNumber);
+        // this.senderName = dis.readUTF();
+      }
+      if (!this.sharedResource) {
+        if (tipDomino()) {
+          logger.info(
+              "thread owned receiver forcing itself to send on thread owned sockets");
+          // bug #49565 - if domino count is >= 2 use shared resources.
+          // Also see DistributedCacheOperation#supportsDirectAck
+        } else { // if (dominoNumber < 2) {
+          ConnectionTable.threadWantsOwnResources();
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
+                dominoNumber);
+          }
+          // } else {
+          // ConnectionTable.threadWantsSharedResources();
+        }
+        this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
+        // Because this thread is not shared resource, it will be used for direct
+        // ack. Direct ack messages can be large. This call will resize the send
+        // buffer.
+        setSendBufferSize(this.socket);
+      }
+      // String name = owner.getDM().getConfig().getName();
+      // if (name == null) {
+      // name = "pid="+OSProcess.getId();
+      // }
+      setThreadName(dominoNumber);
+    } catch (Exception e) {
+      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
+      logger.fatal("Error deserializing P2P handshake message", e);
+      this.readerShuttingDown = true;
+      requestClose("Error deserializing P2P handshake message");
+      return true;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
+          (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+    }
+    try {
+      String authInit = System.getProperty(
+          DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
+      boolean isSecure = authInit != null && authInit.length() != 0;
+
+      if (isSecure) {
+        if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
+          sendOKHandshakeReply(); // fix for bug 33224
+          notifyHandshakeWaiter(true);
         } else {
-          nioInputBuffer.position(nioInputBuffer.limit());
-          nioInputBuffer.limit(nioInputBuffer.capacity());
+          // ARB: check if we need notifyHandshakeWaiter() call.
+          notifyHandshakeWaiter(false);
+          logger.warn("{} timed out during a membership check.",
+              p2pReaderName());
+          return true;
+        }
+      } else {
+        sendOKHandshakeReply(); // fix for bug 33224
+        try {
+          notifyHandshakeWaiter(true);
+        } catch (Exception e) {
+          logger.fatal("Uncaught exception from listener", e);
+        }
+      }
+      this.finishedConnecting = true;
+    } catch (IOException ex) {
+      final String err = "Failed sending handshake reply";
+      if (logger.isDebugEnabled()) {
+        logger.debug(err, ex);
+      }
+      this.readerShuttingDown = true;
+      requestClose(err + ": " + ex);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean readMessageHeader(ByteBuffer peerDataBuffer) throws IOException {
+    int headerStartPos = peerDataBuffer.position();
+    messageLength = peerDataBuffer.getInt();
+    /* nioMessageVersion = */
+    calcHdrVersion(messageLength);
+    messageLength = calcMsgByteSize(messageLength);
+    messageType = peerDataBuffer.get();
+    messageId = peerDataBuffer.getShort();
+    directAck = (messageType & DIRECT_ACK_BIT) != 0;
+    if (directAck) {
+      messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+    }
+    // Following validation fixes bug 31145
+    if (!validMsgType(messageType)) {
+      Integer nioMessageTypeInteger = (int) messageType;
+      logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
+      this.readerShuttingDown = true;
+      requestClose(String.format("Unknown P2P message type: %s",
+          nioMessageTypeInteger));
+      return true;
+    }
+    lengthSet = true;
+    // keep the header "in" the buffer until we have read the entire msg.
+    // Trust me: this will reduce copying on large messages.
+    peerDataBuffer.position(headerStartPos);
+    return false;
+  }
+
+  private void readMessage(ByteBuffer peerDataBuffer) {
+    if (messageType == NORMAL_MSG_TYPE) {
+      this.owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
+      ByteBufferInputStream bbis =
+          remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
+              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
+      DistributionMessage msg;
+      try {
+        ReplyProcessor21.initMessageRPId();
+        // add serialization stats
+        long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
+        msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
+        this.owner.getConduit().getStats().endMsgDeserialization(startSer);
+        if (bbis.available() != 0) {
+          logger.warn("Message deserialization of {} did not read {} bytes.",
+              msg, bbis.available());
+        }
+        try {
+          if (!dispatchMessage(msg, messageLength, directAck)) {
+            directAck = false;
+          }
+        } catch (MemberShunnedException e) {
+          directAck = false; // don't respond (bug39117)
+        } catch (Exception de) {
+          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+          logger.fatal("Error dispatching message", de);
+        } catch (ThreadDeath td) {
+          throw td;
+        } catch (VirtualMachineError err) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error. We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        } catch (Throwable t) {
+          // Whenever you catch Error or Throwable, you must also
+          // catch VirtualMachineError (see above). However, there is
+          // _still_ a possibility that you are dealing with a cascading
+          // error condition, so you also need to check to see if the JVM
+          // is still usable:
+          SystemFailure.checkFailure();
+          logger.fatal("Throwable dispatching message", t);
+        }
+      } catch (VirtualMachineError err) {
+        SystemFailure.initiateFailure(err);
+        // If this ever returns, rethrow the error. We're poisoned
+        // now, so don't let this thread continue.
+        throw err;
+      } catch (Throwable t) {
+        // Whenever you catch Error or Throwable, you must also
+        // catch VirtualMachineError (see above). However, there is
+        // _still_ a possibility that you are dealing with a cascading
+        // error condition, so you also need to check to see if the JVM
+        // is still usable:
+        SystemFailure.checkFailure();
+        sendFailureReply(ReplyProcessor21.getMessageRPId(),
+            "Error deserializing message", t,
+            directAck);
+        if (t instanceof ThreadDeath) {
+          throw (ThreadDeath) t;
+        }
+        if (t instanceof CancelException) {
+          if (!(t instanceof CacheClosedException)) {
+            // Just log a message if we had trouble deserializing due to
+            // CacheClosedException; see bug 43543
+            throw (CancelException) t;
+          }
+        }
+        logger.fatal("Error deserializing message", t);
+      } finally {
+        ReplyProcessor21.clearMessageRPId();
+      }
+    } else if (messageType == CHUNKED_MSG_TYPE) {
+      MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
+      this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+          messageLength);
+      try {
+        md.addChunk(peerDataBuffer, messageLength);
+      } catch (IOException ex) {
+      }
+    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+      // logger.info("END_CHUNK msgId="+nioMsgId);
+      MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
+      this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+          messageLength);
+      try {
+        md.addChunk(peerDataBuffer, messageLength);
+      } catch (IOException ex) {
+        logger.fatal("Failed handling end chunk message", ex);
+      }
+      DistributionMessage msg = null;
+      int msgLength;
+      String failureMsg = null;
+      Throwable failureEx = null;
+      int rpId = 0;
+      boolean interrupted = false;
+      try {
+        msg = md.getMessage();
+      } catch (ClassNotFoundException ex) {
+        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        failureMsg = "ClassNotFound deserializing message";
+        failureEx = ex;
+        rpId = md.getRPid();
+        logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
+      } catch (IOException ex) {
+        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        failureMsg = "IOException deserializing message";
+        failureEx = ex;
+        rpId = md.getRPid();
+        logger.fatal("IOException deserializing message", failureEx);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+      } catch (VirtualMachineError err) {
+        SystemFailure.initiateFailure(err);
+        // If this ever returns, rethrow the error. We're poisoned
+        // now, so don't let this thread continue.
+        throw err;
+      } catch (Throwable ex) {
+        // Whenever you catch Error or Throwable, you must also
+        // catch VirtualMachineError (see above). However, there is
+        // _still_ a possibility that you are dealing with a cascading
+        // error condition, so you also need to check to see if the JVM
+        // is still usable:
+        SystemFailure.checkFailure();
+        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+        this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+        failureMsg = "Unexpected failure deserializing message";
+        failureEx = ex;
+        rpId = md.getRPid();
+        logger.fatal("Unexpected failure deserializing message",
+            failureEx);
+      } finally {
+        msgLength = md.size();
+        releaseMsgDestreamer(messageId, md);
+        if (interrupted) {
+          Thread.currentThread().interrupt();
         }
       }
+      if (msg != null) {
+        try {
+          if (!dispatchMessage(msg, msgLength, directAck)) {
+            directAck = false;
+          }
+        } catch (MemberShunnedException e) {
+          // not a member anymore - don't reply
+          directAck = false;
+        } catch (Exception de) {
+          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+          logger.fatal("Error dispatching message", de);
+        } catch (ThreadDeath td) {
+          throw td;
+        } catch (VirtualMachineError err) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error. We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        } catch (Throwable t) {
+          // Whenever you catch Error or Throwable, you must also
+          // catch VirtualMachineError (see above). However, there is
+          // _still_ a possibility that you are dealing with a cascading
+          // error condition, so you also need to check to see if the JVM
+          // is still usable:
+          SystemFailure.checkFailure();
+          logger.fatal("Throwable dispatching message", t);
+        }
+      } else if (failureEx != null) {
+        sendFailureReply(rpId, failureMsg, failureEx, directAck);
+      }
+    }
+  }
+
+  private boolean readHandshakeForSender(DataInputStream dis) {
+    try {
+      this.replyCode = dis.readUnsignedByte();
+      if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
+        this.asyncDistributionTimeout = dis.readInt();
+        this.asyncQueueTimeout = dis.readInt();
+        this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+        if (this.asyncDistributionTimeout != 0) {
+          logger.info("{} async configuration received {}.",
+              p2pReaderName(),
+              " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+                  + " asyncQueueTimeout=" + this.asyncQueueTimeout
+                  + " asyncMaxQueueSize="
+                  + (this.asyncMaxQueueSize / (1024 * 1024)));
+        }
+        // read the product version ordinal for on-the-fly serialization
+        // transformations (for rolling upgrades)
+        this.remoteVersion = Version.readVersion(dis, true);
+      }
+    } catch (Exception e) {
+      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      logger.fatal("Error deserializing P2P handshake reply", e);
+      this.readerShuttingDown = true;
+      requestClose("Error deserializing P2P handshake reply");
+      return true;
+    } catch (ThreadDeath td) {
+      throw td;
+    } catch (VirtualMachineError err) {
+      SystemFailure.initiateFailure(err);
+      // If this ever returns, rethrow the error. We're poisoned
+      // now, so don't let this thread continue.
+      throw err;
+    } catch (Throwable t) {
+      // Whenever you catch Error or Throwable, you must also
+      // catch VirtualMachineError (see above). However, there is
+      // _still_ a possibility that you are dealing with a cascading
+      // error condition, so you also need to check to see if the JVM
+      // is still usable:
+      SystemFailure.checkFailure();
+      logger.fatal("Throwable deserializing P2P handshake reply",
+          t);
+      this.readerShuttingDown = true;
+      requestClose("Throwable deserializing P2P handshake reply");
+      return true;
     }
+    if (this.replyCode != REPLY_CODE_OK
+        && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
+      String err =
+          "Unknown handshake reply code: %s nioMessageLength: %s";
+      Object[] errArgs = new Object[] {this.replyCode,
+          messageLength};
+      if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+        logger.debug(
+            String.format(err, errArgs) + " (peer probably departed ungracefully)");
+      } else {
+        logger.fatal(err, errArgs);
+      }
+      this.readerShuttingDown = true;
+      requestClose(String.format(err, errArgs));
+      return true;
+    }
+    notifyHandshakeWaiter(true);
+    return false;
   }
 
   private void setThreadName(int dominoNumber) {
@@ -3861,28 +3362,28 @@ public class Connection implements Runnable {
   }
 
   private void compactOrResizeBuffer(int messageLength) {
-    final int oldBufferSize = nioInputBuffer.capacity();
+    final int oldBufferSize = inputBuffer.capacity();
     final DMStats stats = this.owner.getConduit().getStats();
     int allocSize = messageLength + MSG_HEADER_BYTES;
     if (oldBufferSize < allocSize) {
       // need a bigger buffer
       logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
-          Integer.valueOf(allocSize), Integer.valueOf(oldBufferSize));
-      ByteBuffer oldBuffer = nioInputBuffer;
-      nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+          allocSize, oldBufferSize);
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
 
       if (oldBuffer != null) {
         int oldByteCount = oldBuffer.remaining();
-        nioInputBuffer.put(oldBuffer);
-        nioInputBuffer.position(oldByteCount);
+        inputBuffer.put(oldBuffer);
+        inputBuffer.position(oldByteCount);
         Buffers.releaseReceiveBuffer(oldBuffer, stats);
       }
     } else {
-      if (nioInputBuffer.position() != 0) {
-        nioInputBuffer.compact();
+      if (inputBuffer.position() != 0) {
+        inputBuffer.compact();
       } else {
-        nioInputBuffer.position(nioInputBuffer.limit());
-        nioInputBuffer.limit(nioInputBuffer.capacity());
+        inputBuffer.position(inputBuffer.limit());
+        inputBuffer.limit(inputBuffer.capacity());
       }
     }
   }
@@ -3918,11 +3419,11 @@ public class Connection implements Runnable {
     return result;
   }
 
-  public boolean isSocketClosed() {
+  boolean isSocketClosed() {
     return this.socket.isClosed() || !this.socket.isConnected();
   }
 
-  public boolean isReceiverStopped() {
+  boolean isReceiverStopped() {
     return this.stopped;
   }
 
@@ -3945,7 +3446,7 @@ public class Connection implements Runnable {
   /**
    * Return the version of the member on the other side of this connection.
    */
-  public Version getRemoteVersion() {
+  Version getRemoteVersion() {
     return this.remoteVersion;
   }
 
@@ -3966,14 +3467,14 @@ public class Connection implements Runnable {
    * @return true if the connection was initiated here
    * @since GemFire 5.1
    */
-  protected boolean getOriginatedHere() {
+  boolean getOriginatedHere() {
     return !this.isReceiver;
   }
 
   /**
    * answers whether this connection is used for ordered message delivery
    */
-  protected boolean getPreserveOrder() {
+  boolean getPreserveOrder() {
     return preserveOrder;
   }
 
@@ -3987,14 +3488,14 @@ public class Connection implements Runnable {
   /**
    * answers the number of messages received by this connection
    */
-  protected long getMessagesReceived() {
+  long getMessagesReceived() {
     return messagesReceived;
   }
 
   /**
    * answers the number of messages sent on this connection
    */
-  protected long getMessagesSent() {
+  long getMessagesSent() {
     return messagesSent;
   }
 
@@ -4047,30 +3548,4 @@ public class Connection implements Runnable {
     releaseSendPermission();
   }
 
-  boolean nioChecked;
-  boolean useNIO;
-
-  private boolean useNIO() {
-    if (TCPConduit.useSSL) {
-      return false;
-    }
-    if (this.nioChecked) {
-      return this.useNIO;
-    }
-    this.nioChecked = true;
-    this.useNIO = this.owner.getConduit().useNIO();
-    if (!this.useNIO) {
-      return false;
-    }
-    // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
-    if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) {
-      String os = System.getProperty("os.name");
-      if (os != null) {
-        if (os.contains("Windows")) {
-          this.useNIO = false;
-        }
-      }
-    }
-    return this.useNIO;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 8bf3b3a..0cbc189 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -394,6 +394,9 @@ public class ConnectionTable {
     } // synchronized
 
     if (pc != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("created PendingConnection {}", pc);
+      }
       result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
           startTime, ackTimeout, ackSATimeout);
       if (!preserveOrder && scheduleTimeout) {
@@ -1177,9 +1180,11 @@ public class ConnectionTable {
         targetMember = this.id;
       }
 
+      int attempt = 0;
       for (;;) {
-        if (!this.pending)
+        if (!this.pending) {
           break;
+        }
         getConduit().getCancelCriterion().checkCancelInProgress(null);
 
         // wait a little bit...
@@ -1221,7 +1226,8 @@ public class ConnectionTable {
         e = m.get(this.id);
         // }
         if (e == this) {
-          if (logger.isDebugEnabled()) {
+          attempt += 1;
+          if (logger.isDebugEnabled() && (attempt % 20 == 1)) {
             logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
                 ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
           }
@@ -1249,6 +1255,10 @@ public class ConnectionTable {
       return this.conn;
 
     }
+
+    public String toString() {
+      return super.toString() + " created by " + connectingThread.getName();
+    }
   }
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
index 4c74c1b..2a1db17 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
@@ -143,21 +143,6 @@ public class MsgDestreamer {
   }
 
   /**
-   * Adds a chunk to be deserialize
-   *
-   * @param b a byte array contains the bytes of the chunk
-   */
-  public void addChunk(byte[] b) throws IOException {
-    // if this destreamer has failed or this chunk is empty just return
-    if (this.failure == null && b != null && b.length > 0) {
-      // logit("addChunk length=" + b.length);
-      ByteBuffer bb = ByteBuffer.wrap(b);
-      this.t.addChunk(bb, b.length);
-      this.size += b.length;
-    }
-  }
-
-  /**
    * Returns the number of bytes added to this destreamer.
    */
   public int size() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 600d967..f54860b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
+import org.apache.geode.internal.net.Buffers;
 
 /**
  * MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise
@@ -37,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria
    * The caller of this constructor is responsible for managing the allocated instance.
    */
   public MsgOutputStream(int allocSize) {
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       this.buffer = ByteBuffer.allocateDirect(allocSize);
     } else {
       this.buffer = ByteBuffer.allocate(allocSize);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 45c9e98..5e9788d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -14,47 +14,68 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.NioFilter;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
-public abstract class MsgReader {
+public class MsgReader {
+  private static final Logger logger = LogService.getLogger();
+
   protected final Connection conn;
   protected final Header header = new Header();
+  private final NioFilter ioFilter;
+  private final ByteBuffer peerNetData;
   private final ByteBufferInputStream bbis;
 
-  public MsgReader(Connection conn, Version version) {
+  private int lastReadPosition;
+  private int lastProcessedPosition;
+
+  public MsgReader(Connection conn, NioFilter nioFilter, ByteBuffer peerNetData, Version version) {
     this.conn = conn;
+    this.ioFilter = nioFilter;
+    this.peerNetData = peerNetData;
+    ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
+    buffer.position(0).limit(0);
     this.bbis =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
 
   public Header readHeader() throws IOException {
     ByteBuffer nioInputBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+
     int nioMessageLength = nioInputBuffer.getInt();
     /* nioMessageVersion = */ Connection.calcHdrVersion(nioMessageLength);
     nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
     byte nioMessageType = nioInputBuffer.get();
     short nioMsgId = nioInputBuffer.getShort();
+
+    nioInputBuffer.position(nioInputBuffer.limit());
+
     boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
     if (directAck) {
       // logger.info("DEBUG: msg from " + getRemoteAddress() + " is direct ack" );
       nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
     }
 
-    header.nioMessageLength = nioMessageLength;
-    header.nioMessageType = nioMessageType;
-    header.nioMsgId = nioMsgId;
+    header.messageLength = nioMessageLength;
+    header.messageType = nioMessageType;
+    header.messageId = nioMsgId;
     return header;
   }
 
@@ -65,8 +86,8 @@ public abstract class MsgReader {
    */
   public DistributionMessage readMessage(Header header)
       throws IOException, ClassNotFoundException, InterruptedException {
-    ByteBuffer nioInputBuffer = readAtLeast(header.nioMessageLength);
-    this.getStats().incMessagesBeingReceived(true, header.nioMessageLength);
+    ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+    this.getStats().incMessagesBeingReceived(true, header.messageLength);
     long startSer = this.getStats().startMsgDeserialization();
     try {
       bbis.setBuffer(nioInputBuffer);
@@ -77,18 +98,44 @@ public abstract class MsgReader {
       return msg;
     } finally {
       this.getStats().endMsgDeserialization(startSer);
-      this.getStats().decMessagesBeingReceived(header.nioMessageLength);
+      this.getStats().decMessagesBeingReceived(header.messageLength);
+      ioFilter.doneReading(nioInputBuffer);
     }
   }
 
   public void readChunk(Header header, MsgDestreamer md)
       throws IOException, ClassNotFoundException, InterruptedException {
-    ByteBuffer nioInputBuffer = readAtLeast(header.nioMessageLength);
-    this.getStats().incMessagesBeingReceived(md.size() == 0, header.nioMessageLength);
-    md.addChunk(nioInputBuffer, header.nioMessageLength);
+    ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+    this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
+    md.addChunk(nioInputBuffer, header.messageLength);
   }
 
-  public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
+  public ByteBuffer readAtLeast(int bytes) throws IOException {
+
+    ByteBuffer unwrappedBuffer =
+        ioFilter.ensureUnwrappedCapacity(bytes, peerNetData, Buffers.BufferType.UNTRACKED,
+            getStats());
+
+    while ((lastReadPosition - lastProcessedPosition) < bytes) {
+      unwrappedBuffer.limit(unwrappedBuffer.capacity());
+      unwrappedBuffer.position(lastReadPosition);
+
+      int amountRead = conn.getSocket().getChannel().read(peerNetData);
+      if (amountRead < 0) {
+        throw new EOFException();
+      }
+      if (amountRead > 0) {
+        peerNetData.flip();
+        unwrappedBuffer = ioFilter.unwrap(peerNetData);
+        lastReadPosition = unwrappedBuffer.position();
+      }
+    }
+    unwrappedBuffer.limit(lastProcessedPosition + bytes);
+    unwrappedBuffer.position(lastProcessedPosition);
+    lastProcessedPosition = unwrappedBuffer.limit();
+
+    return unwrappedBuffer;
+  }
 
   protected DMStats getStats() {
     return conn.getConduit().getStats();
@@ -96,22 +143,22 @@ public abstract class MsgReader {
 
   public static class Header {
 
-    int nioMessageLength;
-    byte nioMessageType;
-    short nioMsgId;
+    int messageLength;
+    byte messageType;
+    short messageId;
 
     public Header() {}
 
-    public int getNioMessageLength() {
-      return nioMessageLength;
+    public int getMessageLength() {
+      return messageLength;
     }
 
-    public byte getNioMessageType() {
-      return nioMessageType;
+    public byte getMessageType() {
+      return messageType;
     }
 
-    public short getNioMessageId() {
-      return nioMsgId;
+    public short getMessageId() {
+      return messageId;
     }
 
 
@@ -119,4 +166,5 @@ public abstract class MsgReader {
 
   public void close() {}
 
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index a09d2f2..2fb5a34 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -38,6 +38,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.Buffers;
 
 /**
  * <p>
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
deleted file mode 100644
index a4e35a4..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.tcp;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.geode.internal.Version;
-
-/**
- * A message reader which reads from the socket using (blocking) nio.
- *
- */
-public class NIOMsgReader extends MsgReader {
-
-  /** the buffer used for NIO message receipt */
-  private ByteBuffer nioInputBuffer;
-  private final SocketChannel inputChannel;
-  private int lastReadPosition;
-  private int lastProcessedPosition;
-
-  public NIOMsgReader(Connection conn, Version version) throws SocketException {
-    super(conn, version);
-    this.inputChannel = conn.getSocket().getChannel();
-  }
-
-
-  @Override
-  public ByteBuffer readAtLeast(int bytes) throws IOException {
-    ensureCapacity(bytes);
-
-    while (lastReadPosition - lastProcessedPosition < bytes) {
-      nioInputBuffer.limit(nioInputBuffer.capacity());
-      nioInputBuffer.position(lastReadPosition);
-      int bytesRead = inputChannel.read(nioInputBuffer);
-      if (bytesRead < 0) {
-        throw new EOFException();
-      }
-      lastReadPosition = nioInputBuffer.position();
-    }
-    nioInputBuffer.limit(lastProcessedPosition + bytes);
-    nioInputBuffer.position(lastProcessedPosition);
-    lastProcessedPosition = nioInputBuffer.limit();
-
-    return nioInputBuffer;
-  }
-
-  /** gets the buffer for receiving message length bytes */
-  protected void ensureCapacity(int bufferSize) {
-    // Ok, so we have a buffer that's big enough
-    if (nioInputBuffer != null && nioInputBuffer.capacity() > bufferSize) {
-      if (nioInputBuffer.capacity() - lastProcessedPosition < bufferSize) {
-        nioInputBuffer.limit(lastReadPosition);
-        nioInputBuffer.position(lastProcessedPosition);
-        nioInputBuffer.compact();
-        lastReadPosition = nioInputBuffer.position();
-        lastProcessedPosition = 0;
-      }
-      return;
-    }
-
-    // otherwise, we have no buffer to a buffer that's too small
-
-    if (nioInputBuffer == null) {
-      int allocSize = conn.getReceiveBufferSize();
-      if (allocSize == -1) {
-        allocSize = conn.getConduit().tcpBufferSize;
-      }
-      if (allocSize > bufferSize) {
-        bufferSize = allocSize;
-      }
-    }
-    ByteBuffer oldBuffer = nioInputBuffer;
-    nioInputBuffer = Buffers.acquireReceiveBuffer(bufferSize, getStats());
-
-    if (oldBuffer != null) {
-      oldBuffer.limit(lastReadPosition);
-      oldBuffer.position(lastProcessedPosition);
-      nioInputBuffer.put(oldBuffer);
-      lastReadPosition = nioInputBuffer.position(); // fix for 45064
-      lastProcessedPosition = 0;
-      Buffers.releaseReceiveBuffer(oldBuffer, getStats());
-    }
-  }
-
-  @Override
-  public void close() {
-    ByteBuffer tmp = this.nioInputBuffer;
-    if (tmp != null) {
-      this.nioInputBuffer = null;
-      Buffers.releaseReceiveBuffer(tmp, getStats());
-    }
-  }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
index 148c27a..1388061 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.tcp;
 import java.io.IOException;
 import java.net.Socket;
 
+
 public class PeerConnectionFactory {
   /**
    * creates a connection that we accepted (it was initiated by an explicit connect being done on
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 0057847..178bf93 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.tcp;
 
 import java.io.IOException;
-import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -27,11 +26,6 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import javax.net.ssl.SSLException;
 
 import org.apache.logging.log4j.Logger;
 
@@ -50,7 +44,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingExecutors;
 import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.net.SocketCreator;
@@ -110,18 +103,6 @@ public class TCPConduit implements Runnable {
   static boolean useSSL;
 
   /**
-   * Force use of Sockets rather than SocketChannels (NIO). Note from Bruce: due to a bug in the
-   * java VM, NIO cannot be used with IPv6 addresses on Windows. When that condition holds, the
-   * useNIO flag must be disregarded.
-   */
-  private static boolean USE_NIO;
-
-  /**
-   * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
-   */
-  static boolean useDirectBuffers;
-
-  /**
    * The socket producer used by the cluster
    */
   private final SocketCreator socketCreator;
@@ -129,11 +110,6 @@ public class TCPConduit implements Runnable {
 
   private MembershipManager membershipManager;
 
-  /**
-   * true if NIO can be used for the server socket
-   */
-  private boolean useNIO;
-
   static {
     init();
   }
@@ -148,13 +124,13 @@ public class TCPConduit implements Runnable {
 
   public static void init() {
     useSSL = Boolean.getBoolean("p2p.useSSL");
-    // only use nio if not SSL
-    USE_NIO = !useSSL && !Boolean.getBoolean("p2p.oldIO");
     // only use direct buffers if we are using nio
-    useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
-    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
+    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
     // note: bug 37730 concerned this defaulting to 50
-    BACKLOG = Integer.getInteger("p2p.backlog", 1280).intValue();
+    BACKLOG = Integer.getInteger("p2p.backlog", 1280);
+    if (Boolean.getBoolean("p2p.oldIO")) {
+      logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
+    }
   }
 
   ///////////////// permanent conduit state
@@ -162,8 +138,8 @@ public class TCPConduit implements Runnable {
   /**
    * the size of OS TCP/IP buffers, not set by default
    */
-  public int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
-  public int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+  int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+  int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
 
   /**
    * port is the tcp/ip port that this conduit binds to. If it is zero, a port from
@@ -278,24 +254,12 @@ public class TCPConduit implements Runnable {
     this.socketCreator =
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
 
-    this.useNIO = USE_NIO;
-    if (this.useNIO) {
-      InetAddress addr = address;
-      if (addr == null) {
-        try {
-          addr = SocketCreator.getLocalHost();
-        } catch (java.net.UnknownHostException e) {
-          throw new ConnectionException("Unable to resolve localHost address", e);
-        }
-      }
-      // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
-      if (addr instanceof Inet6Address) {
-        String os = System.getProperty("os.name");
-        if (os != null) {
-          if (os.indexOf("Windows") != -1) {
-            this.useNIO = false;
-          }
-        }
+    InetAddress addr = address;
+    if (addr == null) {
+      try {
+        addr = SocketCreator.getLocalHost();
+      } catch (java.net.UnknownHostException e) {
+        throw new ConnectionException("Unable to resolve localHost address", e);
       }
     }
 
@@ -345,53 +309,20 @@ public class TCPConduit implements Runnable {
     }
   }
 
-  private ExecutorService hsPool;
-
   /**
    * the reason for a shutdown, if abnormal
    */
   private volatile Exception shutdownCause;
 
-  private static final int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10).intValue();
-  private static final long HANDSHAKE_POOL_KEEP_ALIVE_TIME =
-      Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60).longValue();
-
-  /**
-   * added to fix bug 40436
-   */
-  public void setMaximumHandshakePoolSize(int maxSize) {
-    if (this.hsPool != null) {
-      ThreadPoolExecutor handshakePool = (ThreadPoolExecutor) this.hsPool;
-      if (maxSize > handshakePool.getMaximumPoolSize()) {
-        handshakePool.setMaximumPoolSize(maxSize);
-      }
-    }
-  }
-
   /**
    * binds the server socket and gets threads going
    */
   private void startAcceptor() throws ConnectionException {
     int localPort;
     int p = this.port;
-    InetAddress ba = this.address;
 
-    {
-      ExecutorService tmp_hsPool = null;
-      String threadName = "P2P-Handshaker " + ba + ":" + p + " Thread ";
-      try {
-        tmp_hsPool =
-            LoggingExecutors.newThreadPoolWithSynchronousFeedThatHandlesRejection(threadName, null,
-                null, 1, HANDSHAKE_POOL_SIZE, HANDSHAKE_POOL_KEEP_ALIVE_TIME);
-      } catch (IllegalArgumentException poolInitException) {
-        throw new ConnectionException(
-            "while creating handshake pool",
-            poolInitException);
-      }
-      this.hsPool = tmp_hsPool;
-    }
     createServerSocket();
+
     try {
       localPort = socket.getLocalPort();
 
@@ -409,7 +340,6 @@ public class TCPConduit implements Runnable {
         logger.fatal(
             "p2p.test.inhibitAcceptor was found to be set, inhibiting incoming tcp/ip connections");
         socket.close();
-        this.hsPool.shutdownNow();
       }
     } catch (IOException io) {
       String s = "While creating ServerSocket on port " + p;
@@ -428,66 +358,40 @@ public class TCPConduit implements Runnable {
     InetAddress bindAddress = this.address;
 
     try {
-      if (this.useNIO) {
-        if (serverPort <= 0) {
-
-          socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
-              connectionRequestBacklog, isBindAddress,
-              this.useNIO, 0, tcpPortRange);
-        } else {
-          ServerSocketChannel channel = ServerSocketChannel.open();
-          socket = channel.socket();
-
-          InetSocketAddress inetSocketAddress =
-              new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
-          socket.bind(inetSocketAddress, connectionRequestBacklog);
-        }
+      if (serverPort <= 0) {
 
-        if (useNIO) {
-          try {
-            // set these buffers early so that large buffers will be allocated
-            // on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
-            socket.setReceiveBufferSize(tcpBufferSize);
-            int newSize = socket.getReceiveBufferSize();
-            if (newSize != tcpBufferSize) {
-              logger.info("{} is {} instead of the requested {}",
-                  "Listener receiverBufferSize", Integer.valueOf(newSize),
-                  Integer.valueOf(tcpBufferSize));
-            }
-          } catch (SocketException ex) {
-            logger.warn("Failed to set listener receiverBufferSize to {}",
-                tcpBufferSize);
-          }
-        }
-        channel = socket.getChannel();
+        socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
+            connectionRequestBacklog, isBindAddress,
+            true, 0, tcpPortRange);
       } else {
-        try {
-          if (serverPort <= 0) {
-            socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
-                connectionRequestBacklog, isBindAddress,
-                this.useNIO, this.tcpBufferSize, tcpPortRange);
-          } else {
-            socket = socketCreator.createServerSocket(serverPort, connectionRequestBacklog,
-                isBindAddress ? bindAddress : null,
-                this.tcpBufferSize);
-          }
-          int newSize = socket.getReceiveBufferSize();
-          if (newSize != this.tcpBufferSize) {
-            logger.info("Listener receiverBufferSize is {} instead of the requested {}",
-                Integer.valueOf(newSize),
-                Integer.valueOf(this.tcpBufferSize));
-          }
-        } catch (SocketException ex) {
-          logger.warn("Failed to set listener receiverBufferSize to {}",
-              this.tcpBufferSize);
+        ServerSocketChannel channel = ServerSocketChannel.open();
+        socket = channel.socket();
 
+        InetSocketAddress inetSocketAddress =
+            new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
+        socket.bind(inetSocketAddress, connectionRequestBacklog);
+      }
+
+      try {
+        // set these buffers early so that large buffers will be allocated
+        // on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
+        socket.setReceiveBufferSize(tcpBufferSize);
+        int newSize = socket.getReceiveBufferSize();
+        if (newSize != tcpBufferSize) {
+          logger.info("{} is {} instead of the requested {}",
+              "Listener receiverBufferSize", newSize,
+              tcpBufferSize);
         }
+      } catch (SocketException ex) {
+        logger.warn("Failed to set listener receiverBufferSize to {}",
+            tcpBufferSize);
       }
+      channel = socket.getChannel();
       port = socket.getLocalPort();
     } catch (IOException io) {
       throw new ConnectionException(
           String.format("While creating ServerSocket on port %s with address %s",
-              new Object[] {Integer.valueOf(serverPort), bindAddress}),
+              serverPort, bindAddress),
           io);
     }
   }
@@ -528,7 +432,6 @@ public class TCPConduit implements Runnable {
       // ignore, please!
     }
 
-    // this.hsPool.shutdownNow(); // I don't trust this not to allocate objects or to synchronize
     // this.conTable.close(); not safe against deadlocks
     ConnectionTable.emergencyClose();
 
@@ -550,7 +453,7 @@ public class TCPConduit implements Runnable {
         // set timeout endpoint here since interrupt() has been known
         // to hang
         long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
-        Thread t = this.thread;;
+        Thread t = this.thread;
         if (channel != null) {
           channel.close();
           // NOTE: do not try to interrupt the listener thread at this point.
@@ -576,12 +479,10 @@ public class TCPConduit implements Runnable {
         if (t != null && t.isAlive()) {
           logger.warn(
               "Unable to shut down listener within {}ms.  Unable to interrupt socket.accept() due to JDK bug. Giving up.",
-              Integer.valueOf(LISTENER_CLOSE_TIMEOUT));
+              LISTENER_CLOSE_TIMEOUT);
         }
       } catch (IOException | InterruptedException e) {
         // we're already trying to shutdown, ignore
-      } finally {
-        this.hsPool.shutdownNow();
       }
 
       // close connections after shutting down acceptor to fix bug 30695
@@ -652,21 +553,8 @@ public class TCPConduit implements Runnable {
 
       Socket othersock = null;
       try {
-        if (this.useNIO) {
-          SocketChannel otherChannel = channel.accept();
-          othersock = otherChannel.socket();
-        } else {
-          try {
-            othersock = socket.accept();
-          } catch (SSLException ex) {
-            // SW: This is the case when there is a problem in P2P
-            // SSL configuration, so need to exit otherwise goes into an
-            // infinite loop just filling the logs
-            logger.warn("Stopping P2P listener due to SSL configuration problem.",
-                ex);
-            break;
-          }
-        }
+        SocketChannel otherChannel = channel.accept();
+        othersock = otherChannel.socket();
         if (stopped) {
           try {
             if (othersock != null) {
@@ -744,21 +632,6 @@ public class TCPConduit implements Runnable {
     }
   }
 
-  private void acceptConnection(final Socket othersock) {
-    try {
-      this.hsPool.execute(new Runnable() {
-        public void run() {
-          basicAcceptConnection(othersock);
-        }
-      });
-    } catch (RejectedExecutionException rejected) {
-      try {
-        othersock.close();
-      } catch (IOException ignore) {
-      }
-    }
-  }
-
   private ConnectionTable getConTable() {
     ConnectionTable result = this.conTable;
     if (result == null) {
@@ -769,17 +642,10 @@ public class TCPConduit implements Runnable {
     return result;
   }
 
-  protected void basicAcceptConnection(Socket othersock) {
+  private void acceptConnection(Socket othersock) {
     try {
-      othersock.setSoTimeout(0);
-      socketCreator.handshakeIfSocketIsSSL(othersock, idleConnectionTimeout);
       getConTable().acceptConnection(othersock, new PeerConnectionFactory());
-    } catch (IOException io) {
-      // exception is logged by the Connection
-      if (!stopped) {
-        this.getStats().incFailedAccept();
-      }
-    } catch (ConnectionException ex) {
+    } catch (IOException | ConnectionException io) {
       // exception is logged by the Connection
       if (!stopped) {
         this.getStats().incFailedAccept();
@@ -795,13 +661,6 @@ public class TCPConduit implements Runnable {
   }
 
   /**
-   * return true if "new IO" classes are being used for the server socket
-   */
-  protected boolean useNIO() {
-    return this.useNIO;
-  }
-
-  /**
    * records the current outgoing message count on all thread-owned ordered connections
    *
    * @since GemFire 5.1
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
index 30a9f9b..8d37d4c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/DscodeHelper.java
@@ -30,7 +30,11 @@ public class DscodeHelper {
 
   public static DSCODE toDSCODE(final byte value) throws IOException {
     try {
-      return dscodes[value];
+      DSCODE result = dscodes[value];
+      if (result == null) {
+        throw new IOException("Unknown header byte " + value);
+      }
+      return result;
     } catch (ArrayIndexOutOfBoundsException e) {
       throw new IOException("Unknown header byte: " + value);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index 801dd6b..a058767 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -108,7 +108,7 @@ public class FederatingManager extends Manager {
         logger.debug("Starting the Federating Manager.... ");
       }
 
-      pooledMembershipExecutor = LoggingExecutors.newFixedThreadPool("FederatingManager", false,
+      pooledMembershipExecutor = LoggingExecutors.newFixedThreadPool("FederatingManager", true,
           Runtime.getRuntime().availableProcessors());
 
       running = true;
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index ab0c4d1..7329c73 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -441,6 +441,7 @@ org/apache/geode/internal/memcached/ResponseStatus$5,false
 org/apache/geode/internal/memcached/ResponseStatus$6,false
 org/apache/geode/internal/memcached/commands/ClientError,true,-2426928000696680541
 org/apache/geode/internal/monitoring/ThreadsMonitoring$Mode,false
+org/apache/geode/internal/net/Buffers$BufferType,false
 org/apache/geode/internal/offheap/MemoryBlock$State,false
 org/apache/geode/internal/offheap/OffHeapStorage$1,false
 org/apache/geode/internal/offheap/OffHeapStorage$2,false
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
new file mode 100644
index 0000000..f21b754
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DMStats;
+
+public class BuffersTest {
+
+  @Test
+  public void expandBuffer() throws Exception {
+    ByteBuffer buffer = ByteBuffer.allocate(256);
+    buffer.clear();
+    for (int i = 0; i < 256; i++) {
+      byte b = (byte) (i & 0xff);
+      buffer.put(b);
+    }
+    createAndVerifyNewBuffer(buffer, false);
+
+    createAndVerifyNewBuffer(buffer, true);
+  }
+
+  private void createAndVerifyNewBuffer(ByteBuffer buffer, boolean useDirectBuffer) {
+    ByteBuffer newBuffer =
+        Buffers.expandBuffer(Buffers.BufferType.UNTRACKED, buffer, 500, mock(DMStats.class));
+    assertEquals(buffer.position(), newBuffer.position());
+    assertEquals(500, newBuffer.capacity());
+    newBuffer.flip();
+    for (int i = 0; i < 256; i++) {
+      byte expected = (byte) (i & 0xff);
+      byte actual = (byte) (newBuffer.get() & 0xff);
+      assertEquals(expected, actual);
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
new file mode 100644
index 0000000..7010db0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
+import static javax.net.ssl.SSLEngineResult.Status.BUFFER_UNDERFLOW;
+import static javax.net.ssl.SSLEngineResult.Status.CLOSED;
+import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.Stack;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLSession;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category({MembershipTest.class})
+public class NioSslEngineTest {
+  private static final int netBufferSize = 10000;
+  private static final int appBufferSize = 20000;
+
+  private SSLEngine mockEngine;
+  private DMStats mockStats;
+  private SSLSession mockSession;
+  private NioSslEngine nioSslEngine;
+  private NioSslEngine spyNioSslEngine;
+
+  @Before
+  public void setUp() throws Exception {
+    mockEngine = mock(SSLEngine.class);
+
+    mockSession = mock(SSLSession.class);
+    when(mockEngine.getSession()).thenReturn(mockSession);
+    when(mockSession.getPacketBufferSize()).thenReturn(netBufferSize);
+    when(mockSession.getApplicationBufferSize()).thenReturn(appBufferSize);
+
+    mockStats = mock(DMStats.class);
+
+    nioSslEngine = new NioSslEngine(mockEngine, mockStats);
+    spyNioSslEngine = spy(nioSslEngine);
+  }
+
+  @Test
+  public void handshake() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(false);
+
+    // initial read of handshake status followed by read of handshake status after task execution
+    when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP, NEED_WRAP);
+
+    // interleaved wraps/unwraps/task-execution
+    when(mockEngine.unwrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(OK, NEED_WRAP, 100, 100),
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0),
+        new SSLEngineResult(OK, NEED_TASK, 100, 0));
+
+    when(mockEngine.getDelegatedTask()).thenReturn(() -> {
+    }, (Runnable) null);
+
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(OK, NEED_UNWRAP, 100, 100),
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
+        new SSLEngineResult(CLOSED, FINISHED, 100, 0));
+
+    spyNioSslEngine.handshake(mockChannel, 10000, ByteBuffer.allocate(netBufferSize / 2));
+    verify(mockEngine, atLeast(2)).getHandshakeStatus();
+    verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
+    verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
+    verify(spyNioSslEngine, times(2)).expandBuffer(any(Buffers.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+    verify(mockChannel, times(3)).read(any(ByteBuffer.class));
+  }
+
+  @Test
+  public void handshakeUsesBufferParameter() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(false);
+
+    // initial read of handshake status followed by read of handshake status after task execution
+    when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP, NEED_WRAP);
+
+    // interleaved wraps/unwraps/task-execution
+    when(mockEngine.unwrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(OK, NEED_WRAP, 100, 100),
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0),
+        new SSLEngineResult(OK, NEED_TASK, 100, 0));
+
+    when(mockEngine.getDelegatedTask()).thenReturn(() -> {
+    }, (Runnable) null);
+
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(OK, NEED_UNWRAP, 100, 100),
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
+        new SSLEngineResult(CLOSED, FINISHED, 100, 0));
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(netBufferSize);
+
+    spyNioSslEngine.handshake(mockChannel, 10000, byteBuffer);
+
+    assertThat(spyNioSslEngine.handshakeBuffer).isSameAs(byteBuffer);
+  }
+
+
+  @Test
+  public void handshakeDetectsClosedSocket() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(true);
+
+    // initial read of handshake status followed by read of handshake status after task execution
+    when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(netBufferSize);
+
+    assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000, byteBuffer))
+        .isInstanceOf(
+            SocketException.class)
+        .hasMessageContaining("handshake terminated");
+  }
+
+  @Test
+  public void handshakeDoesNotTerminateWithFinished() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(false);
+
+    // initial read of handshake status followed by read of handshake status after task execution
+    when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP);
+
+    // interleaved wraps/unwraps/task-execution
+    when(mockEngine.unwrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(OK, NEED_WRAP, 100, 100));
+
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(CLOSED, NOT_HANDSHAKING, 100, 0));
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(netBufferSize);
+
+    assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000, byteBuffer))
+        .isInstanceOf(
+            SSLHandshakeException.class)
+        .hasMessageContaining("SSL Handshake terminated with status");
+  }
+
+
+  @Test
+  public void checkClosed() {
+    nioSslEngine.checkClosed();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void checkClosedThrows() throws Exception {
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(CLOSED, FINISHED, 0, 100));
+    nioSslEngine.close(mock(SocketChannel.class));
+    nioSslEngine.checkClosed();
+  }
+
+  @Test
+  public void wrap() throws Exception {
+    // make the application data too big to fit into the engine's encryption buffer
+    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
+    byte[] appBytes = new byte[appData.capacity()];
+    Arrays.fill(appBytes, (byte) 0x1F);
+    appData.put(appBytes);
+    appData.flip();
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    testEngine.addReturnResult(
+        new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
+    spyNioSslEngine.engine = testEngine;
+
+    ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
+
+    verify(spyNioSslEngine, times(1)).expandBuffer(any(Buffers.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    appData.flip();
+    assertThat(wrappedBuffer).isEqualTo(appData);
+    verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+  }
+
+  @Test
+  public void wrapFails() {
+    // make the application data too big to fit into the engine's encryption buffer
+    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
+    byte[] appBytes = new byte[appData.capacity()];
+    Arrays.fill(appBytes, (byte) 0x1F);
+    appData.put(appBytes);
+    appData.flip();
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    testEngine.addReturnResult(
+        new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
+    spyNioSslEngine.engine = testEngine;
+
+    assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+        .hasMessageContaining("Error encrypting data");
+  }
+
+  @Test
+  public void unwrap() throws Exception {
+    // make the application data too big to fit into the engine's encryption buffer
+    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity() + 100);
+    byte[] netBytes = new byte[wrappedData.capacity()];
+    Arrays.fill(netBytes, (byte) 0x1F);
+    wrappedData.put(netBytes);
+    wrappedData.flip();
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    spyNioSslEngine.engine = testEngine;
+
+    testEngine.addReturnResult(new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
+
+    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
+    unwrappedBuffer.flip();
+
+    verify(spyNioSslEngine, times(1)).expandBuffer(any(Buffers.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    assertThat(unwrappedBuffer).isEqualTo(ByteBuffer.wrap(netBytes));
+  }
+
+  @Test
+  public void unwrapWithBufferUnderflow() throws Exception {
+    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
+    byte[] netBytes = new byte[wrappedData.capacity() / 2];
+    Arrays.fill(netBytes, (byte) 0x1F);
+    wrappedData.put(netBytes);
+    wrappedData.flip();
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
+    spyNioSslEngine.engine = testEngine;
+
+    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
+    unwrappedBuffer.flip();
+    assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
+    assertThat(wrappedData.position()).isEqualTo(netBytes.length);
+  }
+
+  @Test
+  public void unwrapWithDecryptionError() {
+    // make the application data too big to fit into the engine's encryption buffer
+    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
+    byte[] netBytes = new byte[wrappedData.capacity() / 2];
+    Arrays.fill(netBytes, (byte) 0x1F);
+    wrappedData.put(netBytes);
+    wrappedData.flip();
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+    spyNioSslEngine.engine = testEngine;
+
+    assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class)
+        .hasMessageContaining("Error decrypting data");
+  }
+
+  @Test
+  public void ensureUnwrappedCapacity() {
+    ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
+    int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
+    ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity,
+        wrappedBuffer, Buffers.BufferType.UNTRACKED, mockStats);
+    assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
+  }
+
+  @Test
+  public void close() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(false);
+
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+    nioSslEngine.close(mockChannel);
+    assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IllegalStateException.class);
+    nioSslEngine.close(mockChannel);
+  }
+
+  @Test
+  public void closeWhenUnwrapError() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(true);
+
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+        new SSLEngineResult(BUFFER_OVERFLOW, FINISHED, 0, 0));
+    assertThatThrownBy(() -> nioSslEngine.close(mockChannel)).isInstanceOf(GemFireIOException.class)
+        .hasMessageContaining("exception closing SSL session")
+        .hasCauseInstanceOf(SSLException.class);
+  }
+
+  @Test
+  public void closeWhenSocketWriteError() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(true);
+
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
+      // give the NioSslEngine something to write on its socket channel, simulating a TLS close
+      // message
+      nioSslEngine.myNetData.put("Goodbye cruel world".getBytes());
+      return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+    });
+    when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
+    nioSslEngine.close(mockChannel);
+    verify(mockChannel, times(1)).write(any(ByteBuffer.class));
+  }
+
+
+
+  // TestSSLEngine holds a stack of SSLEngineResults and always copies the
+  // input buffer to the output buffer byte-for-byte in wrap() and unwrap() operations.
+  // We use it in some tests where we need the byte-copying behavior because it's
+  // pretty difficult & cumbersome to implement with Mockito.
+  static class TestSSLEngine extends SSLEngine {
+
+    private Stack<SSLEngineResult> returnResults = new Stack<>();
+
+    @Override
+    public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) {
+      for (ByteBuffer source : sources) {
+        destination.put(source);
+      }
+      return returnResults.pop();
+    }
+
+    @Override
+    public SSLEngineResult unwrap(ByteBuffer source, ByteBuffer[] destinations, int i, int i1) {
+      SSLEngineResult sslEngineResult = returnResults.pop();
+      if (sslEngineResult.getStatus() != BUFFER_UNDERFLOW) {
+        destinations[0].put(source);
+      }
+      return sslEngineResult;
+    }
+
+    @Override
+    public Runnable getDelegatedTask() {
+      return null;
+    }
+
+    @Override
+    public void closeInbound() {}
+
+    @Override
+    public boolean isInboundDone() {
+      return false;
+    }
+
+    @Override
+    public void closeOutbound() {}
+
+    @Override
+    public boolean isOutboundDone() {
+      return false;
+    }
+
+    @Override
+    public String[] getSupportedCipherSuites() {
+      return new String[0];
+    }
+
+    @Override
+    public String[] getEnabledCipherSuites() {
+      return new String[0];
+    }
+
+    @Override
+    public void setEnabledCipherSuites(String[] strings) {}
+
+    @Override
+    public String[] getSupportedProtocols() {
+      return new String[0];
+    }
+
+    @Override
+    public String[] getEnabledProtocols() {
+      return new String[0];
+    }
+
+    @Override
+    public void setEnabledProtocols(String[] strings) {}
+
+    @Override
+    public SSLSession getSession() {
+      return null;
+    }
+
+    @Override
+    public void beginHandshake() {}
+
+    @Override
+    public SSLEngineResult.HandshakeStatus getHandshakeStatus() {
+      return null;
+    }
+
+    @Override
+    public void setUseClientMode(boolean b) {}
+
+    @Override
+    public boolean getUseClientMode() {
+      return false;
+    }
+
+    @Override
+    public void setNeedClientAuth(boolean b) {}
+
+    @Override
+    public boolean getNeedClientAuth() {
+      return false;
+    }
+
+    @Override
+    public void setWantClientAuth(boolean b) {}
+
+    @Override
+    public boolean getWantClientAuth() {
+      return false;
+    }
+
+    @Override
+    public void setEnableSessionCreation(boolean b) {}
+
+    @Override
+    public boolean getEnableSessionCreation() {
+      return false;
+    }
+
+    void addReturnResult(SSLEngineResult sslEngineResult) {
+      returnResults.add(sslEngineResult);
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 075b252..73cf06c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -20,14 +20,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
+import java.nio.channels.SocketChannel;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
@@ -64,23 +64,17 @@ public class ConnectionJUnitTest {
     when(conduit.getSocketId())
         .thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337));
 
-    // NIO can't be mocked because SocketChannel has a final method that
-    // is used by Connection - configureBlocking
-    when(conduit.useNIO()).thenReturn(false);
-
     // mock the distribution manager and membership manager
     when(distMgr.getMembershipManager()).thenReturn(membership);
     when(conduit.getDM()).thenReturn(distMgr);
+    when(conduit.getStats()).thenReturn(mock(DMStats.class));
     when(table.getDM()).thenReturn(distMgr);
     SocketCloser closer = mock(SocketCloser.class);
     when(table.getSocketCloser()).thenReturn(closer);
 
-    InputStream instream = mock(InputStream.class);
-    when(instream.read()).thenReturn(-1);
-    Socket socket = mock(Socket.class);
-    when(socket.getInputStream()).thenReturn(instream);
+    SocketChannel channel = SocketChannel.open();
 
-    Connection conn = new Connection(table, socket);
+    Connection conn = new Connection(table, channel.socket());
     conn.setSharedUnorderedForTest();
     conn.run();
     verify(membership).suspectMember(isNull(InternalDistributedMember.class), any(String.class));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index e7928b9..77160c8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -38,9 +38,9 @@ public class ConnectionTest {
     boolean forceAsync = true;
     DistributionMessage mockDistributionMessage = mock(DistributionMessage.class);
 
-    mockConnection.nioWriteFully(channel, buffer, forceAsync, mockDistributionMessage);
+    mockConnection.writeFully(channel, buffer, forceAsync, mockDistributionMessage);
 
-    verify(mockConnection, times(1)).nioWriteFully(channel, buffer, forceAsync,
+    verify(mockConnection, times(1)).writeFully(channel, buffer, forceAsync,
         mockDistributionMessage);
   }
 }
diff --git a/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt b/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
index d1d7ff8..7776fcc 100644
--- a/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
+++ b/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
@@ -16,7 +16,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c85bd8> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -31,7 +31,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c859a0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -61,7 +61,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c85768> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -176,7 +176,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c84fd0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1080,7 +1080,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000007435986d8> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1111,7 +1111,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000007435984a0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1389,7 +1389,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000006479b3c70> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -2074,7 +2074,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000647a66b00> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -2108,7 +2108,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000647a92678> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
diff --git a/geode-dunit/src/main/resources/org/apache/geode/server.keystore b/geode-dunit/src/main/resources/org/apache/geode/server.keystore
new file mode 100644
index 0000000..8b5305f
Binary files /dev/null and b/geode-dunit/src/main/resources/org/apache/geode/server.keystore differ