You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/14 21:01:31 UTC

[geode] 02/02: GEODE-2113 Implement SSL over NIO

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4edbcbfdd291a1cb19b368af6d34a55e6d76c985
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Jan 14 12:56:05 2019 -0800

    GEODE-2113 Implement SSL over NIO
    
    This commit fixes problems encountered in the previous implementation:
    
    - position tracking for MsgReader has been moved into NioSslEngine and
    NioPlainEngine because the Ssl engine sometimes needs to allocate a new
    buffer and adjust the tracked read/process positions in the buffer
    
    - the same buffer expansion methods in Buffers was being used for
    read-side and write-side buffers but buffers being used for reads need
    to be handled differently than buffers being used for writing
    
    The first commit is the original implementation.  The second commit
    fixes the problems in that commit.  If you've already reviewed the old
    code you can probably just look at the second commit.
    
    There is commented-out debugging code that I'll remove when these
    changes pass the Pull Request Stress Tests.
---
 .../geode/ClusterCommunicationsDUnitTest.java      | 420 +++++++++++++++++++++
 .../java/org/apache/geode/ClusterSSLDUnitTest.java | 249 ------------
 .../internal/net/SSLSocketIntegrationTest.java     |  29 +-
 .../membership/gms/mgr/GMSMembershipManager.java   |   7 -
 .../apache/geode/internal/cache/properties.html    |  11 -
 .../org/apache/geode/internal/net/Buffers.java     |  29 +-
 .../org/apache/geode/internal/net/NioEngine.java   |  52 ---
 .../org/apache/geode/internal/net/NioFilter.java   |  27 +-
 .../apache/geode/internal/net/NioPlainEngine.java  | 119 ++++++
 .../apache/geode/internal/net/NioSslEngine.java    | 127 +++++--
 .../apache/geode/internal/net/SocketCreator.java   |  12 -
 .../org/apache/geode/internal/tcp/Connection.java  |  65 ++--
 .../org/apache/geode/internal/tcp/MsgReader.java   | 111 ++----
 .../org/apache/geode/internal/tcp/MsgStreamer.java |   1 +
 .../org/apache/geode/internal/tcp/TCPConduit.java  |   8 +-
 .../org/apache/geode/internal/net/BuffersTest.java |   3 +-
 .../geode/internal/net/NioSslEngineTest.java       | 127 ++++++-
 17 files changed, 898 insertions(+), 499 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
new file mode 100644
index 0000000..d5c30c4
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.internal.DataSerializableFixedID.SERIAL_ACKED_MESSAGE;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import org.apache.geode.internal.DSFIDFactory;
+import org.apache.geode.internal.cache.DirectReplyMessage;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.VersionManager;
+import org.apache.geode.util.test.TestUtil;
+
+
+/**
+ * This class tests cluster tcp/ip communications both with and without SSL enabled
+ */
+@Category({MembershipTest.class, BackwardCompatibilityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClusterCommunicationsDUnitTest implements java.io.Serializable {
+
+  private boolean conserveSockets;
+  private boolean useSSL;
+
+  enum RunConfiguration {
+    SHARED_CONNECTIONS(true, false),
+    SHARED_CONNECTIONS_WITH_SSL(true, true),
+    UNSHARED_CONNECTIONS(false, false),
+    UNSHARED_CONNECTIONS_WITH_SSL(false, true);
+
+    boolean useSSL;
+    boolean conserveSockets;
+
+    RunConfiguration(boolean conserveSockets, boolean useSSL) {
+      this.useSSL = useSSL;
+      this.conserveSockets = conserveSockets;
+    }
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<RunConfiguration> data() {
+    return Arrays.asList(RunConfiguration.values());
+  }
+
+  private static final int NUM_SERVERS = 2;
+  private static final int SMALL_BUFFER_SIZE = 8000;
+
+  private static final long serialVersionUID = -3438183140385150550L;
+
+  private static Cache cache;
+
+  @Rule
+  public DistributedRule distributedRule =
+      DistributedRule.builder().withVMCount(NUM_SERVERS + 1).build();
+
+  @Rule
+  public final SerializableTestName testName = new SerializableTestName();
+
+  final String regionName = "clusterTestRegion";
+
+  public ClusterCommunicationsDUnitTest(RunConfiguration runConfiguration) {
+    this.useSSL = runConfiguration.useSSL;
+    this.conserveSockets = runConfiguration.conserveSockets;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    final Boolean testWithSSL = useSSL;
+    final Boolean testWithConserveSocketsTrue = conserveSockets;
+    Invoke.invokeInEveryVM(() -> {
+      this.useSSL = testWithSSL;
+      this.conserveSockets = testWithConserveSocketsTrue;
+    });
+  }
+
+  @Test
+  public void createEntryAndVerifyUpdate() {
+    int locatorPort = createLocator(VM.getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createCacheAndRegion(VM.getVM(i), locatorPort);
+    }
+    performCreate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyCreatedEntry(VM.getVM(i));
+    }
+    performUpdate(VM.getVM(1));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      verifyUpdatedEntry(VM.getVM(i));
+    }
+  }
+
+  @Test
+  public void createEntryWithBigMessage() {
+    int locatorPort = createLocator(VM.getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createCacheAndRegion(VM.getVM(i), locatorPort);
+    }
+    performCreateWithLargeValue(VM.getVM(1));
+    // fault the value into an empty cache - forces use of message chunking
+    for (int i = 1; i <= NUM_SERVERS - 1; i++) {
+      verifyCreatedEntry(VM.getVM(i));
+    }
+  }
+
+  @Test
+  public void receiveBigResponse() {
+    Invoke.invokeInEveryVM(() -> DSFIDFactory.registerDSFID(SERIAL_ACKED_MESSAGE,
+        SerialAckedMessageWithBigReply.class));
+    try {
+      int locatorPort = createLocator(VM.getVM(0));
+      for (int i = 1; i <= NUM_SERVERS; i++) {
+        createCacheAndRegion(VM.getVM(i), locatorPort);
+      }
+      final DistributedMember vm2ID =
+          VM.getVM(2).invoke(() -> cache.getDistributedSystem().getDistributedMember());
+      VM.getVM(1).invoke("receive a large direct-reply message", () -> {
+        SerialAckedMessageWithBigReply messageWithBigReply = new SerialAckedMessageWithBigReply();
+        await().atMost(30, TimeUnit.SECONDS).until(() -> {
+          messageWithBigReply.send(Collections.<DistributedMember>singleton(vm2ID));
+          return true;
+        });
+      });
+    } finally {
+      Invoke.invokeInEveryVM(
+          () -> DSFIDFactory.registerDSFID(SERIAL_ACKED_MESSAGE, SerialAckedMessage.class));
+    }
+  }
+
+  @Test
+  public void performARollingUpgrade() {
+    List<String> testVersions = VersionManager.getInstance().getVersionsWithoutCurrent();
+    Collections.sort(testVersions);
+    String testVersion = testVersions.get(testVersions.size() - 1);
+
+    // create a cluster with the previous version of Geode
+    VM locatorVM = Host.getHost(0).getVM(testVersion, 0);
+    VM server1VM = Host.getHost(0).getVM(testVersion, 1);
+    int locatorPort = createLocator(locatorVM);
+    createCacheAndRegion(server1VM, locatorPort);
+    performCreate(VM.getVM(1));
+
+    // roll the locator to the current version
+    locatorVM.invoke("stop locator", () -> Locator.getLocator().stop());
+    locatorVM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 0);
+    locatorVM.invoke("roll locator to current version", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      Properties props = getDistributedSystemProperties();
+      // locator must restart with the same port so that it reconnects to the server
+      await().atMost(15, TimeUnit.SECONDS)
+          .until(() -> Locator.startLocatorAndDS(locatorPort, new File(""), props) != null);
+      assertThat(Locator.getLocator().getDistributedSystem().getAllOtherMembers().size())
+          .isGreaterThan(0);
+    });
+
+    // start server2 with current version
+    VM server2VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 2);
+    createCacheAndRegion(server2VM, locatorPort);
+
+    // roll server1 to the current version
+    server1VM.invoke("stop server1", () -> {
+      cache.close();
+    });
+    server1VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 1);
+    createCacheAndRegion(server1VM, locatorPort);
+
+
+    verifyCreatedEntry(server1VM);
+    verifyCreatedEntry(server2VM);
+  }
+
+  private void createCacheAndRegion(VM memberVM, int locatorPort) {
+    memberVM.invoke("start cache and create region", () -> {
+      cache = createCache(locatorPort);
+      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    });
+  }
+
+
+  private void performCreate(VM memberVM) {
+    memberVM.invoke("perform create", () -> cache
+        .getRegion(regionName).put("testKey", "testValue"));
+  }
+
+  private void performUpdate(VM memberVM) {
+    memberVM.invoke("perform update", () -> cache
+        .getRegion(regionName).put("testKey", "updatedTestValue"));
+  }
+
+  private void performCreateWithLargeValue(VM memberVM) {
+    memberVM.invoke("perform create", () -> {
+      byte[] value = new byte[SMALL_BUFFER_SIZE * 20];
+      Arrays.fill(value, (byte) 1);
+      cache.getRegion(regionName).put("testKey", value);
+    });
+  }
+
+  private void verifyCreatedEntry(VM memberVM) {
+    memberVM.invoke("verify entry created", () -> Assert.assertTrue(cache
+        .getRegion(regionName).containsKey("testKey")));
+  }
+
+  private void verifyUpdatedEntry(VM memberVM) {
+    memberVM.invoke("verify entry updated", () -> Assert.assertTrue(cache
+        .getRegion(regionName).containsValue("updatedTestValue")));
+  }
+
+  private int createLocator(VM memberVM) {
+    return memberVM.invoke("create locator", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      try {
+        return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
+            .getPort();
+      } finally {
+        System.clearProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+      }
+    });
+  }
+
+  private Cache createCache(int locatorPort) {
+    // if you need to debug SSL communications use this property:
+    // System.setProperty("javax.net.debug", "all");
+    Properties properties = getDistributedSystemProperties();
+    properties.put(LOCATORS, "localhost[" + locatorPort + "]");
+    return new CacheFactory(properties).create();
+  }
+
+  public Properties getDistributedSystemProperties() {
+    Properties properties = new Properties();
+    properties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
+    properties.put(USE_CLUSTER_CONFIGURATION, "false");
+    properties.put(NAME, "vm" + VM.getCurrentVMNum());
+    properties.put(CONSERVE_SOCKETS, "" + conserveSockets);
+    properties.put(SOCKET_LEASE_TIME, "10000");
+    properties.put(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
+
+    if (useSSL) {
+      properties.put(SSL_ENABLED_COMPONENTS, "cluster,locator");
+      properties.put(SSL_KEYSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
+      properties.put(SSL_TRUSTSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
+      properties.put(SSL_PROTOCOLS, "TLSv1.2");
+      properties.put(SSL_KEYSTORE_PASSWORD, "password");
+      properties.put(SSL_TRUSTSTORE_PASSWORD, "password");
+      properties.put(SSL_REQUIRE_AUTHENTICATION, "true");
+    }
+    return properties;
+  }
+
+  /**
+   * SerialAckedMessageWithBigReply requires conserve-sockets=false and acts to send
+   * a large reply message to the sender. You must have already created a cache in the
+   * sender and receiver VMs and registered this class with the DataSerializableFixedID
+   * of SERIAL_ACKED_MESSAGE. Don't forget to reset the registration to
+   * SerialAckedMessage at the end of the test.
+   */
+  public static class SerialAckedMessageWithBigReply extends DistributionMessage
+      implements MessageWithReply,
+      DirectReplyMessage {
+    static final int DSFID = SERIAL_ACKED_MESSAGE;
+
+    private int processorId;
+    private transient ClusterDistributionManager originDm;
+    private transient DirectReplyProcessor replyProcessor;
+
+    public SerialAckedMessageWithBigReply() {
+      super();
+      InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+      if (ds != null) { // this constructor is used in serialization as well as when sending to
+                        // others
+        this.originDm = (ClusterDistributionManager) ds.getDistributionManager();
+      }
+    }
+
+    public void send(Set<DistributedMember> recipients)
+        throws InterruptedException, ReplyException {
+      // this message is only used by battery tests so we can log info level debug
+      // messages
+      replyProcessor = new DirectReplyProcessor(originDm, recipients);
+      processorId = replyProcessor.getProcessorId();
+      setRecipients(recipients);
+      Set failures = originDm.putOutgoing(this);
+      if (failures != null && failures.size() > 0) {
+        for (Object failure : failures) {
+          System.err.println("Unable to send serial acked message to " + failure);
+        }
+      }
+
+      replyProcessor.waitForReplies();
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      out.writeInt(processorId);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      processorId = in.readInt();
+    }
+
+    @Override
+    protected void process(ClusterDistributionManager dm) {
+      ReplyMessage reply = new ReplyMessage();
+      reply.setProcessorId(processorId);
+      reply.setRecipient(getSender());
+      byte[] returnValue = new byte[SMALL_BUFFER_SIZE * 6];
+      reply.setReturnValue(returnValue);
+      System.out.println("<" + Thread.currentThread().getName() +
+          "> sending reply with return value size "
+          + returnValue.length + " using " + getReplySender(dm));
+      getReplySender(dm).putOutgoing(reply);
+    }
+
+    @Override
+    public int getProcessorId() {
+      return processorId;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return ClusterDistributionManager.SERIAL_EXECUTOR;
+    }
+
+    @Override
+    public int getDSFID() {
+      return DSFID;
+    }
+
+    @Override
+    public DirectReplyProcessor getDirectReplyProcessor() {
+      return replyProcessor;
+    }
+
+    @Override
+    public boolean supportsDirectAck() {
+      return processorId == 0;
+    }
+
+    @Override
+    public void registerProcessor() {
+      if (replyProcessor != null) {
+        this.processorId = this.replyProcessor.register();
+      }
+    }
+  }
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java
deleted file mode 100644
index dbca857..0000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterSSLDUnitTest.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode;
-
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
-import static org.apache.geode.distributed.ConfigurationProperties.NAME;
-import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
-import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.DistributedRule;
-import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
-import org.apache.geode.test.junit.categories.MembershipTest;
-import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
-import org.apache.geode.test.version.VersionManager;
-import org.apache.geode.util.test.TestUtil;
-
-@Category({MembershipTest.class, BackwardCompatibilityTest.class})
-public class ClusterSSLDUnitTest implements java.io.Serializable {
-
-  private static final int NUM_SERVERS = 2;
-  private static final int SMALL_BUFFER_SIZE = 8000;
-
-  private static final long serialVersionUID = -3438183140385150550L;
-
-  @Rule
-  public DistributedRule distributedRule =
-      DistributedRule.builder().withVMCount(NUM_SERVERS + 1).build();
-
-  @Rule
-  public final SerializableTestName testName = new SerializableTestName();
-
-  final String regionName = testName.getMethodName() + "_Region";
-
-  private static Cache cache;
-
-  @After
-  public void teardown() throws Exception {
-    if (cache != null) {
-      try {
-        if (!cache.isClosed()) {
-          cache.close();
-        }
-      } finally {
-        cache = null;
-      }
-    }
-  }
-
-
-  @Test
-  public void createEntryWithConserveSockets() throws Exception {
-    int locatorPort = createLocator(VM.getVM(0));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, true);
-    }
-    performCreate(VM.getVM(1));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      verifyCreatedEntry(VM.getVM(i));
-    }
-    performUpdate(VM.getVM(1));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      verifyUpdatedEntry(VM.getVM(i));
-    }
-  }
-
-  @Test
-  public void createEntryWithThreadOwnedSockets() throws Exception {
-    int locatorPort = createLocator(VM.getVM(0));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, false);
-    }
-    performCreate(VM.getVM(1));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      verifyCreatedEntry(VM.getVM(i));
-    }
-    performUpdate(VM.getVM(1));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      verifyUpdatedEntry(VM.getVM(i));
-    }
-  }
-
-  @Test
-  public void createEntryWithThreadOwnedSocketsAndBigMessage() throws Exception {
-    int locatorPort = createLocator(VM.getVM(0));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      createSSLEnabledCacheAndRegion(VM.getVM(i), locatorPort, false);
-    }
-    performCreateWithLargeValue(VM.getVM(1));
-    for (int i = 1; i <= NUM_SERVERS; i++) {
-      verifyCreatedEntry(VM.getVM(i));
-    }
-  }
-
-  @Test
-  public void performARollingUpgrade() throws Exception {
-    List<String> testVersions = VersionManager.getInstance().getVersionsWithoutCurrent();
-    Collections.sort(testVersions);
-    String testVersion = testVersions.get(testVersions.size() - 1);
-
-    // create a cluster with the previous version of Geode
-    VM locatorVM = Host.getHost(0).getVM(testVersion, 0);
-    VM server1VM = Host.getHost(0).getVM(testVersion, 1);
-    int locatorPort = createLocator(locatorVM);
-    createSSLEnabledCacheAndRegion(server1VM, locatorPort, true);
-    performCreate(VM.getVM(1));
-
-    // roll the locator to the current version
-    locatorVM.invoke("stop locator", () -> Locator.getLocator().stop());
-    locatorVM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 0);
-    locatorVM.invoke("roll locator to current version", () -> {
-      // if you need to debug SSL communications use this property:
-      // System.setProperty("javax.net.debug", "all");
-      Properties props = getDistributedSystemProperties();
-      // locator must restart with the same port so that it reconnects to the server
-      GeodeAwaitility.await().atMost(15, TimeUnit.SECONDS)
-          .until(() -> Locator.startLocatorAndDS(locatorPort, new File(""), props) != null);
-      assertThat(Locator.getLocator().getDistributedSystem().getAllOtherMembers().size())
-          .isGreaterThan(0);
-    });
-
-    // start server2 with current version
-    VM server2VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 2);
-    createSSLEnabledCacheAndRegion(server2VM, locatorPort, true);
-
-    // roll server1 to the current version
-    server1VM.invoke("stop server1", () -> {
-      cache.close();
-    });
-    server1VM = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 1);
-    createSSLEnabledCacheAndRegion(server1VM, locatorPort, true);
-
-
-    verifyCreatedEntry(server1VM);
-    verifyCreatedEntry(server2VM);
-  }
-
-  private void createSSLEnabledCacheAndRegion(VM memberVM, int locatorPort,
-      boolean conserveSockets) {
-    memberVM.invoke("start cache and create region", () -> {
-      cache = createCache(locatorPort, conserveSockets);
-      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
-    });
-  }
-
-
-  private void performCreate(VM memberVM) {
-    memberVM.invoke("perform create", () -> cache
-        .getRegion(regionName).put("testKey", "testValue"));
-  }
-
-  private void performUpdate(VM memberVM) {
-    memberVM.invoke("perform update", () -> cache
-        .getRegion(regionName).put("testKey", "updatedTestValue"));
-  }
-
-  private void performCreateWithLargeValue(VM memberVM) {
-    memberVM.invoke("perform create", () -> {
-      byte[] value = new byte[SMALL_BUFFER_SIZE];
-      Arrays.fill(value, (byte) 1);
-      cache.getRegion(regionName).put("testKey", value);
-    });
-  }
-
-  private void verifyCreatedEntry(VM memberVM) {
-    memberVM.invoke("verify entry created", () -> Assert.assertTrue(cache
-        .getRegion(regionName).containsKey("testKey")));
-  }
-
-  private void verifyUpdatedEntry(VM memberVM) {
-    memberVM.invoke("verify entry updated", () -> Assert.assertTrue(cache
-        .getRegion(regionName).containsValue("updatedTestValue")));
-  }
-
-  private int createLocator(VM memberVM) {
-    return memberVM.invoke("create locator", () -> {
-      // if you need to debug SSL communications use this property:
-      // System.setProperty("javax.net.debug", "all");
-      return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties()).getPort();
-    });
-  }
-
-  private Cache createCache(int locatorPort, boolean conserveSockets) {
-    // if you need to debug SSL communications use this property:
-    // System.setProperty("javax.net.debug", "all");
-    Properties properties = getDistributedSystemProperties();
-    properties.put(ConfigurationProperties.LOCATORS, "localhost[" + locatorPort + "]");
-    properties.put(ConfigurationProperties.CONSERVE_SOCKETS, "" + conserveSockets);
-    return new CacheFactory(properties).create();
-  }
-
-  public Properties getDistributedSystemProperties() {
-    Properties properties = new Properties();
-    properties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
-    properties.put(USE_CLUSTER_CONFIGURATION, "false");
-    properties.put(SSL_ENABLED_COMPONENTS, "cluster");
-    properties.put(SSL_KEYSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
-    properties.put(SSL_TRUSTSTORE, TestUtil.getResourcePath(this.getClass(), "server.keystore"));
-    properties.put(SSL_PROTOCOLS, "TLSv1.2");
-    properties.put(SSL_KEYSTORE_PASSWORD, "password");
-    properties.put(SSL_TRUSTSTORE_PASSWORD, "password");
-    properties.put(SSL_REQUIRE_AUTHENTICATION, "true");
-    properties.put(SOCKET_LEASE_TIME, "10000");
-    properties.put(NAME, "vm" + VM.getCurrentVMNum());
-    properties.put(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
-    return properties;
-  }
-}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 9ab1bb1..440f502 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -216,6 +216,7 @@ public class SSLSocketIntegrationTest {
     // transmit expected string from Client to Server
     writeMessageToNIOSSLServer(clientChannel, engine);
     writeMessageToNIOSSLServer(clientChannel, engine);
+    writeMessageToNIOSSLServer(clientChannel, engine);
     // this is the real assertion of this test
     await().until(() -> {
       return !serverThread.isAlive();
@@ -261,6 +262,7 @@ public class SSLSocketIntegrationTest {
 
         readMessageFromNIOSSLClient(socket, buffer, engine);
         readMessageFromNIOSSLClient(socket, buffer, engine);
+        readMessageFromNIOSSLClient(socket, buffer, engine);
       } catch (Throwable throwable) {
         throwable.printStackTrace(System.out);
         serverException = throwable;
@@ -282,14 +284,25 @@ public class SSLSocketIntegrationTest {
 
   private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
       throws IOException {
-    int bytesRead = socket.getChannel().read(buffer);
-    buffer.flip();
-    System.out.println("server bytes read is " + bytesRead + ": buffer position is "
-        + buffer.position() + " and limit is " + buffer.limit());
-    ByteBuffer unwrapped = engine.unwrap(buffer);
-    unwrapped.flip();
-    System.out.println("server unwrapped buffer position is " + unwrapped.position()
-        + " and limit is " + unwrapped.limit());
+
+    ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
+    // if we already have unencrypted data skip unwrapping
+    if (unwrapped.position() == 0) {
+      int bytesRead;
+      // if we already have encrypted data skip reading from the socket
+      if (buffer.position() == 0) {
+        bytesRead = socket.getChannel().read(buffer);
+        buffer.flip();
+      } else {
+        bytesRead = buffer.remaining();
+      }
+      System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+          + buffer.position() + " and limit is " + buffer.limit());
+      unwrapped = engine.unwrap(buffer);
+      unwrapped.flip();
+      System.out.println("server unwrapped buffer position is " + unwrapped.position()
+          + " and limit is " + unwrapped.limit());
+    }
     ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
     DataInputStream dis = new DataInputStream(bbis);
     String welcome = dis.readUTF();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index fc01cbb..1040bb3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -647,8 +647,6 @@ public class GMSMembershipManager implements MembershipManager, Manager {
         this.isJoining = true; // added for bug #44373
 
         // connect
-        long start = System.currentTimeMillis();
-
         boolean ok = services.getJoinLeave().join();
 
         if (!ok) {
@@ -656,11 +654,6 @@ public class GMSMembershipManager implements MembershipManager, Manager {
               + "Operation either timed out, was stopped or Locator does not exist.");
         }
 
-        long delta = System.currentTimeMillis() - start;
-
-        logger.info(LogMarker.DISTRIBUTION_MARKER, "Joined the distributed system (took  {}  ms)",
-            delta);
-
         NetView initialView = services.getJoinLeave().getView();
         latestView = new NetView(initialView, initialView.getViewId());
         listener.viewInstalled(latestView);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
index 18b3478..6bbc3a5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
@@ -2919,17 +2919,6 @@ See org.apache.geode.internal.tcp.TCPConduit#startAcceptor(int, int, InetAddress
 TBA 
 </dd>
 
-<!-- -------------------------------------------------------  -->
-<dt><strong>p2p.useSSL</strong></dt>
-<dd>
-<em>Public:</em> false
-<p>
-<em>Boolean</em> (default is false)
-<p>
-See org.apache.geode.internal.tcp.TCPConduit#useSSL.
-<p>
-TBA 
-</dd>
 
 <!-- -------------------------------------------------------  -->
 <dt><strong>query.disableIndexes</strong></dt>
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
index c9e7bcb..d1dc7b9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
@@ -115,10 +115,33 @@ public class Buffers {
     releaseBuffer(bb, stats, false);
   }
 
-  static ByteBuffer expandBuffer(Buffers.BufferType type, ByteBuffer existing,
+  /**
+   * expand a buffer that's currently being read from
+   */
+  static ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
+      int desiredCapacity, DMStats stats) {
+    if (existing.capacity() >= desiredCapacity) {
+      if (existing.position() > 0) {
+        existing.compact();
+        existing.flip();
+      }
+      return existing;
+    }
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    newBuffer.clear();
+    existing.flip();
+    newBuffer.put(existing);
+    newBuffer.flip();
+    releaseBuffer(type, existing, stats);
+    return newBuffer;
+  }
+
+  /**
+   * expand a buffer that's currently being written to
+   */
+  static ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
       int desiredCapacity, DMStats stats) {
     if (existing.capacity() >= desiredCapacity) {
-      existing.compact();
       return existing;
     }
     ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
@@ -129,7 +152,7 @@ public class Buffers {
     return newBuffer;
   }
 
-  private static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
+  static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
     switch (type) {
       case UNTRACKED:
         return ByteBuffer.allocate(capacity);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
deleted file mode 100644
index 28cd3d6..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.net;
-
-import java.nio.ByteBuffer;
-
-import org.apache.geode.distributed.internal.DMStats;
-
-/**
- * A pass-through implementation of NioFilter. Use this if you don't need
- * secure communications.
- */
-public class NioEngine implements NioFilter {
-
-  public NioEngine() {}
-
-  @Override
-  public ByteBuffer wrap(ByteBuffer buffer) {
-    return buffer;
-  }
-
-  @Override
-  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
-    wrappedBuffer.position(wrappedBuffer.limit());
-    return wrappedBuffer;
-  }
-
-  @Override
-  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
-    return wrappedBuffer;
-  }
-
-  @Override
-  public ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType,
-      DMStats stats) {
-    return Buffers.expandBuffer(bufferType, wrappedBuffer, amount, stats);
-  }
-
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index bb879e5..6cb40ec 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -40,6 +40,25 @@ public interface NioFilter {
   ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
 
   /**
+   * ensure that the wrapped buffer has enough room to read the given amount of data.
+   * This must be invoked before readAtLeast. A new buffer may be returned by this method.
+   */
+  ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType, DMStats stats);
+
+  /**
+   * read at least the indicated amount of bytes from the given
+   * socket. The buffer position will be ready for reading
+   * the data when this method returns. Note: you must invoke ensureWrappedCapacity
+   * with the given amount prior to each invocation of this method.
+   * <br>
+   * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
+   * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+   */
+  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer,
+      DMStats stats) throws IOException;
+
+  /**
    * You must invoke this when done reading from the unwrapped buffer
    */
   default void doneReading(ByteBuffer unwrappedBuffer) {
@@ -64,13 +83,5 @@ public interface NioFilter {
    */
   ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
 
-  /**
-   * ensures that the unwrapped buffer associated with the given wrapped buffer has
-   * sufficient capacity for the given amount of bytes. This may compact the
-   * buffer or it may return a new buffer.
-   */
-  ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType,
-      DMStats stats);
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
new file mode 100644
index 0000000..64c275e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * A pass-through implementation of NioFilter. Use this if you don't need
+ * secure communications.
+ */
+public class NioPlainEngine implements NioFilter {
+  private static final Logger logger = LogService.getLogger();
+
+  private int lastReadPosition;
+  private int lastProcessedPosition;
+
+
+  public NioPlainEngine() {}
+
+  @Override
+  public ByteBuffer wrap(ByteBuffer buffer) {
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+    wrappedBuffer.position(wrappedBuffer.limit());
+    return wrappedBuffer;
+  }
+
+  @Override
+  public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType, DMStats stats) {
+    ByteBuffer buffer = wrappedBuffer;
+
+    // logger.info("BRUCE: plain ensureWrappedCapacity({}) buffer {} position {} limit {} capacity
+    // {}", amount, Integer.toHexString(System.identityHashCode(buffer)), buffer.position(),
+    // buffer.limit(), buffer.capacity());
+
+    if (buffer.capacity() > amount) {
+      // we already have a buffer that's big enough
+      if (buffer.capacity() - lastProcessedPosition < amount) {
+        buffer.limit(lastReadPosition);
+        buffer.position(lastProcessedPosition);
+        buffer.compact();
+        lastReadPosition = buffer.position();
+        lastProcessedPosition = 0;
+      }
+    } else {
+      ByteBuffer oldBuffer = buffer;
+      oldBuffer.limit(lastReadPosition);
+      oldBuffer.position(lastProcessedPosition);
+      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer.clear();
+      buffer.put(oldBuffer);
+      lastReadPosition = buffer.position();
+      lastProcessedPosition = 0;
+    }
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer,
+      DMStats stats) throws IOException {
+    ByteBuffer buffer = wrappedBuffer;
+
+    // logger.info("BRUCE: plain readAtLeast({}) buffer {} position {} limit {} capacity {}", bytes,
+    // Integer.toHexString(System.identityHashCode(buffer)), buffer.position(), buffer.limit(),
+    // buffer.capacity());
+    // logger.info("BRUCE: lastProcessedPosition {} lastReadPosition {}", lastProcessedPosition,
+    // lastReadPosition);
+    while (lastReadPosition - lastProcessedPosition < bytes) {
+      buffer.limit(buffer.capacity());
+      buffer.position(lastReadPosition);
+      int amountRead = channel.read(buffer);
+      if (amountRead < 0) {
+        throw new EOFException();
+      }
+      lastReadPosition = buffer.position();
+      // logger.info("BRUCE: read {} bytes. buffer position is now {}", amountRead,
+      // buffer.position());
+    }
+    buffer.limit(lastProcessedPosition + bytes);
+    buffer.position(lastProcessedPosition);
+    lastProcessedPosition = buffer.limit();
+    // logger.info("BRUCE: new lastProcessedPosition {} lastReadPosition {}", lastProcessedPosition,
+    // lastReadPosition);
+    // logger.info("BRUCE: plain readAtLeast done position {} limit {}", buffer.position(),
+    // buffer.limit());
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
+    return wrappedBuffer;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 32f4978..9c98fa0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -18,10 +18,12 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
-import static javax.net.ssl.SSLEngineResult.Status.BUFFER_UNDERFLOW;
 import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_RECEIVER;
+import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_SENDER;
 import static org.apache.geode.internal.net.Buffers.releaseBuffer;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
@@ -53,7 +55,7 @@ public class NioSslEngine implements NioFilter {
 
   private final DMStats stats;
 
-  private boolean closed;
+  private volatile boolean closed;
 
   SSLEngine engine;
 
@@ -152,7 +154,7 @@ public class NioSslEngine implements NioFilter {
 
           if (engineResult.getStatus() == BUFFER_OVERFLOW) {
             peerAppData =
-                expandBuffer(Buffers.BufferType.UNTRACKED, peerAppData, peerAppData.capacity() * 2,
+                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2,
                     stats);
           }
           break;
@@ -169,7 +171,7 @@ public class NioSslEngine implements NioFilter {
           switch (engineResult.getStatus()) {
             case BUFFER_OVERFLOW:
               myNetData =
-                  expandBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData,
+                  expandWriteBuffer(TRACKED_SENDER, myNetData,
                       myNetData.capacity() * 2, stats);
               break;
             case OK:
@@ -182,7 +184,7 @@ public class NioSslEngine implements NioFilter {
             case CLOSED:
               break;
             default:
-              logger.info("handshake terminated with illegal state(1) due to {}", status);
+              logger.info("handshake terminated with illegal state due to {}", status);
               throw new IllegalStateException(
                   "Unknown SSLEngineResult status: " + engineResult.getStatus());
           }
@@ -193,7 +195,7 @@ public class NioSslEngine implements NioFilter {
           status = engine.getHandshakeStatus();
           break;
         default:
-          logger.info("handshake terminated with illegal state(2) due to {}", status);
+          logger.info("handshake terminated with illegal state due to {}", status);
           throw new IllegalStateException("Unknown SSL Handshake state: " + status);
       }
       Thread.sleep(10);
@@ -214,9 +216,9 @@ public class NioSslEngine implements NioFilter {
     return true;
   }
 
-  ByteBuffer expandBuffer(Buffers.BufferType type, ByteBuffer existing,
+  ByteBuffer expandWriteBuffer(Buffers.BufferType type, ByteBuffer existing,
       int desiredCapacity, DMStats stats) {
-    return Buffers.expandBuffer(type, existing, desiredCapacity, stats);
+    return Buffers.expandWriteBufferIfNeeded(type, existing, desiredCapacity, stats);
   }
 
   void checkClosed() {
@@ -246,7 +248,7 @@ public class NioSslEngine implements NioFilter {
 
       if (remaining < (appData.remaining() * 2)) {
         int newCapacity = expandedCapacity(appData, myNetData);
-        myNetData = expandBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData, newCapacity, stats);
+        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity, stats);
       }
 
       SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
@@ -273,25 +275,87 @@ public class NioSslEngine implements NioFilter {
     // message. TcpConduit, for instance, uses message chunking to
     // transmit large payloads and we may have read a partial chunk
     // during the previous unwrap
+
+    // it's better to be pro-active about avoiding buffer overflows
+    expandPeerAppData(wrappedBuffer);
+    peerAppData.limit(peerAppData.capacity());
+    // logger.info("BRUCE: ssl unwrap data position={} and capacity={} wrapped buffer position={}
+    // and limit={}",
+    // peerAppData.position(), peerAppData.capacity(), wrappedBuffer.position(),
+    // wrappedBuffer.limit());
     while (wrappedBuffer.hasRemaining()) {
-      int remaining = peerAppData.capacity() - peerAppData.position();
-      if (remaining < wrappedBuffer.remaining() * 2) {
-        int newCapacity = expandedCapacity(peerAppData, wrappedBuffer);
-        peerAppData = expandBuffer(Buffers.BufferType.UNTRACKED, peerAppData, newCapacity, stats);
-      }
       SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
-      if (unwrapResult.getStatus() == BUFFER_UNDERFLOW) {
-        // partial data - need to read more. When this happens the SSLEngine will not have
-        // changed the buffer position
-        wrappedBuffer.compact();
-        return peerAppData;
+      switch (unwrapResult.getStatus()) {
+        case BUFFER_OVERFLOW:
+          expandPeerAppData(wrappedBuffer);
+          break;
+        case BUFFER_UNDERFLOW:
+          // partial data - need to read more. When this happens the SSLEngine will not have
+          // changed the buffer position
+          // logger.info("BRUCE: ssl unwrap needs more bytes from the network");
+          wrappedBuffer.compact();
+          return peerAppData;
+        case OK:
+          break;
+        default:
+          throw new SSLException("Error decrypting data: " + unwrapResult);
       }
+    }
+    wrappedBuffer.clear();
+    return peerAppData;
+  }
+
+  void expandPeerAppData(ByteBuffer wrappedBuffer) {
+    if (peerAppData.capacity() - peerAppData.position() < 2 * wrappedBuffer.remaining()) {
+      // logger.info("BRUCE: ssl unwrap is expanding the capacity of data buffer {} current capacity
+      // {} position {} limit {}.",
+      // Integer.toHexString(System.identityHashCode(peerAppData)), peerAppData.capacity(),
+      // peerAppData.position(), peerAppData.limit());
+      peerAppData =
+          Buffers.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
+              expandedCapacity(wrappedBuffer, peerAppData), stats);
+      // logger.info("BRUCE: ssl unwrap new capacity of data buffer {} is {} position {} limit {}.",
+      // Integer.toHexString(System.identityHashCode(peerAppData)), peerAppData.capacity(),
+      // peerAppData.position(), peerAppData.limit());
+    }
+  }
 
-      if (unwrapResult.getStatus() != OK) {
-        throw new SSLException("Error decrypting data: " + unwrapResult);
+  @Override
+  public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
+      Buffers.BufferType bufferType, DMStats stats) {
+    return wrappedBuffer;
+  }
+
+  @Override
+  public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
+      ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
+
+    if (peerAppData.capacity() > bytes) {
+      // we already have a buffer that's big enough
+      if (peerAppData.capacity() - peerAppData.position() < bytes) {
+        peerAppData.compact();
+        peerAppData.flip();
+      }
+    } else {
+      peerAppData =
+          Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
+    }
+
+    while (peerAppData.remaining() < bytes) {
+      wrappedBuffer.limit(wrappedBuffer.capacity());
+      int amountRead = channel.read(wrappedBuffer);
+      if (amountRead < 0) {
+        throw new EOFException();
+      }
+      if (amountRead > 0) {
+        wrappedBuffer.flip();
+        // prep the decoded buffer for writing
+        peerAppData.compact();
+        peerAppData = unwrap(wrappedBuffer);
+        // done writing to the decoded buffer - prep it for reading again
+        peerAppData.flip();
       }
     }
-    wrappedBuffer.clear();
     return peerAppData;
   }
 
@@ -300,11 +364,16 @@ public class NioSslEngine implements NioFilter {
     return peerAppData;
   }
 
-  @Override
-  public ByteBuffer ensureUnwrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType,
-      DMStats stats) {
-    peerAppData = Buffers.expandBuffer(bufferType, peerAppData, amount, this.stats);
+  /**
+   * ensures that the unwrapped buffer associated with the given wrapped buffer has
+   * sufficient capacity for the given amount of bytes. This may compact the
+   * buffer or it may return a new buffer.
+   */
+  public ByteBuffer ensureUnwrappedCapacity(int amount) {
+    // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
+    // do not use them for I/O operations
+    peerAppData =
+        Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount, this.stats);
     return peerAppData;
   }
 
@@ -342,8 +411,8 @@ public class NioSslEngine implements NioFilter {
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
     } finally {
-      releaseBuffer(Buffers.BufferType.TRACKED_SENDER, myNetData, stats);
-      releaseBuffer(Buffers.BufferType.UNTRACKED, peerAppData, stats);
+      releaseBuffer(TRACKED_SENDER, myNetData, stats);
+      releaseBuffer(TRACKED_RECEIVER, peerAppData, stats);
       this.closed = true;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index 3db82c7..10c8787 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -98,7 +98,6 @@ import org.apache.geode.internal.admin.SSLConfig;
 import org.apache.geode.internal.cache.wan.TransportFilterServerSocket;
 import org.apache.geode.internal.cache.wan.TransportFilterSocketFactory;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.internal.util.ArgumentRedactor;
 import org.apache.geode.internal.util.PasswordUtil;
 
@@ -336,17 +335,6 @@ public class SocketCreator {
    */
   private void initialize() {
     try {
-      // set p2p values...
-      if (SecurableCommunicationChannel.CLUSTER
-          .equals(sslConfig.getSecuredCommunicationChannel())) {
-        if (this.sslConfig.isEnabled()) {
-          System.setProperty("p2p.useSSL", "true");
-          System.setProperty("p2p.nodirectBuffers", "true");
-        } else {
-          System.setProperty("p2p.useSSL", "false");
-        }
-      }
-
       try {
         if (this.sslConfig.isEnabled() && sslContext == null) {
           sslContext = createAndConfigureSSLContext();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 28745b6..5dce242 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -76,8 +76,8 @@ import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.net.Buffers;
-import org.apache.geode.internal.net.NioEngine;
 import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.tcp.MsgReader.Header;
 import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
@@ -422,16 +422,7 @@ public class Connection implements Runnable {
     setSocketBufferSize(sock, false, requestedSize);
   }
 
-  int getReceiveBufferSize() {
-    return recvBufferSize;
-  }
-
   private void setSocketBufferSize(Socket sock, boolean send, int requestedSize) {
-    setSocketBufferSize(sock, send, requestedSize, false);
-  }
-
-  private void setSocketBufferSize(Socket sock, boolean send, int requestedSize,
-      boolean alreadySetInSocket) {
     if (requestedSize > 0) {
       try {
         int currentSize = send ? sock.getSendBufferSize() : sock.getReceiveBufferSize();
@@ -441,12 +432,10 @@ public class Connection implements Runnable {
           }
           return;
         }
-        if (!alreadySetInSocket) {
-          if (send) {
-            sock.setSendBufferSize(requestedSize);
-          } else {
-            sock.setReceiveBufferSize(requestedSize);
-          }
+        if (send) {
+          sock.setSendBufferSize(requestedSize);
+        } else {
+          sock.setReceiveBufferSize(requestedSize);
         }
       } catch (SocketException ignore) {
       }
@@ -1613,6 +1602,8 @@ public class Connection implements Runnable {
     StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
       sb.append("P2P message reader@");
+    } else if (this.handshakeRead) {
+      sb.append("P2P message sender@");
     } else {
       sb.append("P2P handshake reader@");
     }
@@ -1724,14 +1715,16 @@ public class Connection implements Runnable {
             this.readerShuttingDown = true;
             try {
               requestClose("SocketChannel.read returned EOF");
-              requestClose(
-                  "SocketChannel.read returned EOF");
             } catch (Exception e) {
               // ignore - shutting down
             }
             return;
           }
 
+          // logger.info("BRUCE: run() read {} bytes net buffer {} position {} limit {} capacity
+          // {}", amountRead, Integer.toHexString(System.identityHashCode(inputBuffer)),
+          // inputBuffer.position(), inputBuffer.limit(), inputBuffer.capacity());
+
           processInputBuffer();
 
           if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
@@ -1817,7 +1810,7 @@ public class Connection implements Runnable {
   }
 
   private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
-    if (TCPConduit.useSSL && channel != null) {
+    if (getConduit().useSSL() && channel != null) {
       InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
       SSLEngine engine =
           getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort());
@@ -1827,19 +1820,25 @@ public class Connection implements Runnable {
         engine.setNeedClientAuth(true);
       }
 
+      int packetBufferSize = engine.getSession().getPacketBufferSize();
       if (inputBuffer == null
-          || (inputBuffer.capacity() < engine.getSession().getPacketBufferSize())) {
+          || (inputBuffer.capacity() < packetBufferSize)) {
         // TLS has a minimum input buffer size constraint
         if (inputBuffer != null) {
           Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats());
         }
-        inputBuffer = Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(),
-            getConduit().getStats());
+        inputBuffer = Buffers.acquireReceiveBuffer(packetBufferSize, getConduit().getStats());
+      }
+      if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
+        channel.socket().setReceiveBufferSize(packetBufferSize);
+      }
+      if (channel.socket().getSendBufferSize() < packetBufferSize) {
+        channel.socket().setSendBufferSize(packetBufferSize);
       }
       ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
           getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats());
     } else {
-      ioFilter = new NioEngine();
+      ioFilter = new NioPlainEngine();
     }
   }
 
@@ -2749,18 +2748,18 @@ public class Connection implements Runnable {
           // fall through
         }
         ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-
-        do {
+        while (wrappedBuffer.remaining() > 0) {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
             // this.writerThread = Thread.currentThread();
             amtWritten = channel.write(wrappedBuffer);
+            // logger.info("BRUCE: wrote {} bytes to {}", amtWritten, this.remoteAddr);
           } finally {
             stats.endSocketWrite(true, start, amtWritten, 0);
             // this.writerThread = null;
           }
-        } while (wrappedBuffer.remaining() > 0);
+        }
 
       } // synchronized
     } else {
@@ -2833,14 +2832,17 @@ public class Connection implements Runnable {
       ReplyMessage msg;
       int len;
       if (header.getMessageType() == NORMAL_MSG_TYPE) {
+        // logger.info("BRUCE: reading direct ack message");
         msg = (ReplyMessage) msgReader.readMessage(header);
         len = header.getMessageLength();
       } else {
         MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
         while (header.getMessageType() == CHUNKED_MSG_TYPE) {
+          // logger.info("BRUCE: reading direct ack chunk");
           msgReader.readChunk(header, destreamer);
           header = msgReader.readHeader();
         }
+        // logger.info("BRUCE: reading last direct ack chunk");
         msgReader.readChunk(header, destreamer);
         msg = (ReplyMessage) destreamer.getMessage();
         releaseMsgDestreamer(header.getMessageId(), destreamer);
@@ -2901,9 +2903,6 @@ public class Connection implements Runnable {
             getRemoteAddress());
         this.ackTimedOut = false;
       }
-      if (msgReader != null) {
-        msgReader.close();
-      }
     }
     synchronized (stateLock) {
       this.connectionState = STATE_RECEIVED_ACK;
@@ -2924,7 +2923,6 @@ public class Connection implements Runnable {
 
     while (!done && connected) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      // long startTime = DistributionStats.getStatTime();
       int remaining = peerDataBuffer.remaining();
       if (lengthSet || remaining >= MSG_HEADER_BYTES) {
         if (!lengthSet) {
@@ -2968,7 +2966,7 @@ public class Connection implements Runnable {
           peerDataBuffer.position(startPos + messageLength);
         } else {
           done = true;
-          if (TCPConduit.useSSL) {
+          if (getConduit().useSSL()) {
             ioFilter.doneReading(peerDataBuffer);
           } else {
             compactOrResizeBuffer(messageLength);
@@ -3120,6 +3118,7 @@ public class Connection implements Runnable {
 
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
+      // logger.info("BRUCE: reading normal message");
       this.owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
       ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
@@ -3185,6 +3184,8 @@ public class Connection implements Runnable {
             throw (CancelException) t;
           }
         }
+        // logger.info("BRUCE: buffer position {} limit {}", peerDataBuffer.position(),
+        // peerDataBuffer.limit());
         logger.fatal("Error deserializing message", t);
       } finally {
         ReplyProcessor21.clearMessageRPId();
@@ -3194,11 +3195,13 @@ public class Connection implements Runnable {
       this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
           messageLength);
       try {
+        // logger.info("BRUCE: adding message chunk of length {}", messageLength);
         md.addChunk(peerDataBuffer, messageLength);
       } catch (IOException ex) {
       }
     } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
       // logger.info("END_CHUNK msgId="+nioMsgId);
+      // logger.info("BRUCE: reading last message chunk");
       MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
       this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
           messageLength);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 5e9788d..0fc96ad 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -14,18 +14,14 @@
  */
 package org.apache.geode.internal.tcp;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.net.NioFilter;
 
@@ -35,47 +31,39 @@ import org.apache.geode.internal.net.NioFilter;
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
   protected final Connection conn;
   protected final Header header = new Header();
   private final NioFilter ioFilter;
-  private final ByteBuffer peerNetData;
-  private final ByteBufferInputStream bbis;
+  private ByteBuffer peerNetData;
+  private final ByteBufferInputStream byteBufferInputStream;
 
-  private int lastReadPosition;
-  private int lastProcessedPosition;
 
-  public MsgReader(Connection conn, NioFilter nioFilter, ByteBuffer peerNetData, Version version) {
+
+  MsgReader(Connection conn, NioFilter nioFilter, ByteBuffer peerNetData, Version version) {
     this.conn = conn;
     this.ioFilter = nioFilter;
     this.peerNetData = peerNetData;
     ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
     buffer.position(0).limit(0);
-    this.bbis =
+    this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
 
-  public Header readHeader() throws IOException {
-    ByteBuffer nioInputBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+  Header readHeader() throws IOException {
+    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
 
-    int nioMessageLength = nioInputBuffer.getInt();
+    int nioMessageLength = unwrappedBuffer.getInt();
     /* nioMessageVersion = */ Connection.calcHdrVersion(nioMessageLength);
     nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-    byte nioMessageType = nioInputBuffer.get();
-    short nioMsgId = nioInputBuffer.getShort();
-
-    nioInputBuffer.position(nioInputBuffer.limit());
+    byte nioMessageType = unwrappedBuffer.get();
+    short nioMsgId = unwrappedBuffer.getShort();
 
     boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
     if (directAck) {
-      // logger.info("DEBUG: msg from " + getRemoteAddress() + " is direct ack" );
       nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
     }
 
-    header.messageLength = nioMessageLength;
-    header.messageType = nioMessageType;
-    header.messageId = nioMsgId;
+    header.setFields(nioMessageLength, nioMessageType, nioMsgId);
     return header;
   }
 
@@ -84,18 +72,15 @@ public class MsgReader {
    *
    * @return the message, or null if we only received a chunk of the message
    */
-  public DistributionMessage readMessage(Header header)
-      throws IOException, ClassNotFoundException, InterruptedException {
+  DistributionMessage readMessage(Header header)
+      throws IOException, ClassNotFoundException {
     ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
     this.getStats().incMessagesBeingReceived(true, header.messageLength);
     long startSer = this.getStats().startMsgDeserialization();
     try {
-      bbis.setBuffer(nioInputBuffer);
-      DistributionMessage msg = null;
+      byteBufferInputStream.setBuffer(nioInputBuffer);
       ReplyProcessor21.initMessageRPId();
-      // add serialization stats
-      msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
-      return msg;
+      return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
     } finally {
       this.getStats().endMsgDeserialization(startSer);
       this.getStats().decMessagesBeingReceived(header.messageLength);
@@ -103,68 +88,52 @@ public class MsgReader {
     }
   }
 
-  public void readChunk(Header header, MsgDestreamer md)
-      throws IOException, ClassNotFoundException, InterruptedException {
-    ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+  void readChunk(Header header, MsgDestreamer md)
+      throws IOException {
+    ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
     this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
-    md.addChunk(nioInputBuffer, header.messageLength);
+    md.addChunk(unwrappedBuffer, header.messageLength);
+    // show that the bytes have been consumed by adjusting the buffer's position
+    unwrappedBuffer.position(unwrappedBuffer.position() + header.messageLength);
   }
 
-  public ByteBuffer readAtLeast(int bytes) throws IOException {
-
-    ByteBuffer unwrappedBuffer =
-        ioFilter.ensureUnwrappedCapacity(bytes, peerNetData, Buffers.BufferType.UNTRACKED,
-            getStats());
-
-    while ((lastReadPosition - lastProcessedPosition) < bytes) {
-      unwrappedBuffer.limit(unwrappedBuffer.capacity());
-      unwrappedBuffer.position(lastReadPosition);
-
-      int amountRead = conn.getSocket().getChannel().read(peerNetData);
-      if (amountRead < 0) {
-        throw new EOFException();
-      }
-      if (amountRead > 0) {
-        peerNetData.flip();
-        unwrappedBuffer = ioFilter.unwrap(peerNetData);
-        lastReadPosition = unwrappedBuffer.position();
-      }
-    }
-    unwrappedBuffer.limit(lastProcessedPosition + bytes);
-    unwrappedBuffer.position(lastProcessedPosition);
-    lastProcessedPosition = unwrappedBuffer.limit();
 
-    return unwrappedBuffer;
+
+  private ByteBuffer readAtLeast(int bytes) throws IOException {
+    peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
+        Buffers.BufferType.TRACKED_RECEIVER, getStats());
+    return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
   }
 
-  protected DMStats getStats() {
+
+
+  private DMStats getStats() {
     return conn.getConduit().getStats();
   }
 
   public static class Header {
 
-    int messageLength;
-    byte messageType;
-    short messageId;
+    private int messageLength;
+    private byte messageType;
+    private short messageId;
 
-    public Header() {}
+    public void setFields(int nioMessageLength, byte nioMessageType, short nioMsgId) {
+      messageLength = nioMessageLength;
+      messageType = nioMessageType;
+      messageId = nioMsgId;
+    }
 
-    public int getMessageLength() {
+    int getMessageLength() {
       return messageLength;
     }
 
-    public byte getMessageType() {
+    byte getMessageType() {
       return messageType;
     }
 
-    public short getMessageId() {
+    short getMessageId() {
       return messageId;
     }
-
-
   }
 
-  public void close() {}
-
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 2fb5a34..7a589cc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -316,6 +316,7 @@ public class MsgStreamer extends OutputStream
         con.sendPreserialized(this.buffer,
             lastFlushForMessage && this.msg.containsRegionContentChange(), conflationMsg);
       } catch (IOException ex) {
+        // logger.info("BRUCE: message transmission threw an exception", ex);
         it.remove();
         if (this.ce == null)
           this.ce = new ConnectExceptions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 178bf93..612c8fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -100,7 +100,7 @@ public class TCPConduit implements Runnable {
   /**
    * use javax.net.ssl.SSLServerSocketFactory?
    */
-  static boolean useSSL;
+  boolean useSSL;
 
   /**
    * The socket producer used by the cluster
@@ -123,7 +123,6 @@ public class TCPConduit implements Runnable {
   }
 
   public static void init() {
-    useSSL = Boolean.getBoolean("p2p.useSSL");
     // only use direct buffers if we are using nio
     LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
     // note: bug 37730 concerned this defaulting to 50
@@ -253,6 +252,7 @@ public class TCPConduit implements Runnable {
 
     this.socketCreator =
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
+    this.useSSL = socketCreator.useSSL();
 
     InetAddress addr = address;
     if (addr == null) {
@@ -967,6 +967,10 @@ public class TCPConduit implements Runnable {
     return stats;
   }
 
+  public boolean useSSL() {
+    return useSSL;
+  }
+
   protected class Stopper extends CancelCriterion {
 
     @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
index f21b754..46c9dcd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
@@ -41,7 +41,8 @@ public class BuffersTest {
 
   private void createAndVerifyNewBuffer(ByteBuffer buffer, boolean useDirectBuffer) {
     ByteBuffer newBuffer =
-        Buffers.expandBuffer(Buffers.BufferType.UNTRACKED, buffer, 500, mock(DMStats.class));
+        Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
+            mock(DMStats.class));
     assertEquals(buffer.position(), newBuffer.position());
     assertEquals(500, newBuffer.capacity());
     newBuffer.flip();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 7010db0..63348f8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -26,6 +26,7 @@ import static javax.net.ssl.SSLEngineResult.Status.OK;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -38,8 +39,9 @@ import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Stack;
+import java.util.List;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
@@ -50,6 +52,8 @@ import javax.net.ssl.SSLSession;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.distributed.internal.DMStats;
@@ -110,7 +114,7 @@ public class NioSslEngineTest {
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
-    verify(spyNioSslEngine, times(2)).expandBuffer(any(Buffers.BufferType.class),
+    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(Buffers.BufferType.class),
         any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
     verify(mockChannel, times(3)).read(any(ByteBuffer.class));
@@ -225,7 +229,7 @@ public class NioSslEngineTest {
 
     ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
 
-    verify(spyNioSslEngine, times(1)).expandBuffer(any(Buffers.BufferType.class),
+    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(Buffers.BufferType.class),
         any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
     appData.flip();
     assertThat(wrappedBuffer).isEqualTo(appData);
@@ -252,7 +256,7 @@ public class NioSslEngineTest {
   }
 
   @Test
-  public void unwrap() throws Exception {
+  public void unwrapWithBufferOverflow() throws Exception {
     // make the application data too big to fit into the engine's encryption buffer
     ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity() + 100);
     byte[] netBytes = new byte[wrappedData.capacity()];
@@ -264,13 +268,14 @@ public class NioSslEngineTest {
     TestSSLEngine testEngine = new TestSSLEngine();
     spyNioSslEngine.engine = testEngine;
 
-    testEngine.addReturnResult(new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
+    testEngine.addReturnResult(
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, netBytes.length, netBytes.length),
+        new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
 
     ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
     unwrappedBuffer.flip();
 
-    verify(spyNioSslEngine, times(1)).expandBuffer(any(Buffers.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(2)).expandPeerAppData(any(ByteBuffer.class));
     assertThat(unwrappedBuffer).isEqualTo(ByteBuffer.wrap(netBytes));
   }
 
@@ -315,8 +320,7 @@ public class NioSslEngineTest {
   public void ensureUnwrappedCapacity() {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
     int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
-    ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity,
-        wrappedBuffer, Buffers.BufferType.UNTRACKED, mockStats);
+    ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity);
     assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
   }
 
@@ -369,6 +373,84 @@ public class NioSslEngineTest {
     verify(mockChannel, times(1)).write(any(ByteBuffer.class));
   }
 
+  @Test
+  public void ensureWrappedCapacity() {
+    ByteBuffer buffer = ByteBuffer.allocate(10);
+    assertThat(
+        nioSslEngine.ensureWrappedCapacity(10, buffer, Buffers.BufferType.UNTRACKED, mockStats))
+            .isEqualTo(buffer);
+  }
+
+  @Test
+  public void readAtLeast() throws Exception {
+    final int amountToRead = 150;
+    final int individualRead = 60;
+    final int preexistingBytes = 10;
+    ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
+    SocketChannel mockChannel = mock(SocketChannel.class);
+
+    // force a compaction by making the decoded buffer appear near to being full
+    ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData;
+    unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
+    unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
+
+    // simulate some socket reads
+    when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) throws Throwable {
+        ByteBuffer buffer = invocation.getArgument(0);
+        buffer.position(buffer.position() + individualRead);
+        return individualRead;
+      }
+    });
+
+    TestSSLEngine testSSLEngine = new TestSSLEngine();
+    testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
+    nioSslEngine.engine = testSSLEngine;
+
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+    assertThat(data.position()).isEqualTo(0);
+    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+  }
+
+
+  @Test
+  public void readAtLeastUsingSmallAppBuffer() throws Exception {
+    final int amountToRead = 150;
+    final int individualRead = 60;
+    final int preexistingBytes = 10;
+    ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
+    SocketChannel mockChannel = mock(SocketChannel.class);
+
+    // force buffer expansion by making a small decoded buffer appear near to being full
+    ByteBuffer unwrappedBuffer = ByteBuffer.allocate(100);
+    unwrappedBuffer.position(preexistingBytes);
+    nioSslEngine.peerAppData = unwrappedBuffer;
+
+    // simulate some socket reads
+    when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) throws Throwable {
+        ByteBuffer buffer = invocation.getArgument(0);
+        buffer.position(buffer.position() + individualRead);
+        return individualRead;
+      }
+    });
+
+    TestSSLEngine testSSLEngine = new TestSSLEngine();
+    testSSLEngine.addReturnResult(
+        new SSLEngineResult(OK, NEED_UNWRAP, 0, 0), // 10 + 60 bytes = 70
+        new SSLEngineResult(OK, NEED_UNWRAP, 0, 0), // 70 + 60 bytes = 130
+        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // need 190 bytes capacity
+        new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
+    nioSslEngine.engine = testSSLEngine;
+
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+    assertThat(data.position()).isEqualTo(0);
+    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+  }
 
 
   // TestSSLEngine holds a stack of SSLEngineResults and always copies the
@@ -377,20 +459,29 @@ public class NioSslEngineTest {
   // pretty difficult & cumbersome to implement with Mockito.
   static class TestSSLEngine extends SSLEngine {
 
-    private Stack<SSLEngineResult> returnResults = new Stack<>();
+    private List<SSLEngineResult> returnResults = new ArrayList<>();
+
+    private SSLEngineResult nextResult() {
+      SSLEngineResult result = returnResults.remove(0);
+      if (returnResults.isEmpty()) {
+        returnResults.add(result);
+      }
+      return result;
+    }
 
     @Override
     public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) {
       for (ByteBuffer source : sources) {
         destination.put(source);
       }
-      return returnResults.pop();
+      return nextResult();
     }
 
     @Override
     public SSLEngineResult unwrap(ByteBuffer source, ByteBuffer[] destinations, int i, int i1) {
-      SSLEngineResult sslEngineResult = returnResults.pop();
-      if (sslEngineResult.getStatus() != BUFFER_UNDERFLOW) {
+      SSLEngineResult sslEngineResult = nextResult();
+      if (sslEngineResult.getStatus() != BUFFER_UNDERFLOW
+          && sslEngineResult.getStatus() != BUFFER_OVERFLOW) {
         destinations[0].put(source);
       }
       return sslEngineResult;
@@ -488,8 +579,14 @@ public class NioSslEngineTest {
       return false;
     }
 
-    void addReturnResult(SSLEngineResult sslEngineResult) {
-      returnResults.add(sslEngineResult);
+    /**
+     * add an engine operation result to be returned by wrap or unwrap.
+     * Like Mockito's thenReturn(), the last return result will repeat forever
+     */
+    void addReturnResult(SSLEngineResult... sslEngineResult) {
+      for (SSLEngineResult result : sslEngineResult) {
+        returnResults.add(result);
+      }
     }
   }
 }