You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2020/10/30 16:32:09 UTC

[geode] branch backport-1-13-GEODE-8652-and-friends created (now 0906b08)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch backport-1-13-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 0906b08  GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)

This branch includes the following new commits:

     new ca51909  GEODE-8136: Move UncheckedUtils to geode-common (#5123)
     new 9a066e6  GEODE-8540: Create new DistributedBlackboard Rule (#5557)
     new 0906b08  GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/03: GEODE-8136: Move UncheckedUtils to geode-common (#5123)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch backport-1-13-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git

commit ca51909fe78be707376d9d411d709a4cea3bbdb2
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue May 26 10:24:44 2020 -0700

    GEODE-8136: Move UncheckedUtils to geode-common (#5123)
    
    Create UncheckedUtilsTest to unit test UncheckedUtils.
    
    Extract FunctionExecution methods to new TypedFunctionService in
    geode-core.
    
    (cherry picked from commit 253d667b27423e55601e925f7a20f2fd6f0efc31)
---
 .../geode/util/internal}/UncheckedUtils.java       | 19 ++++---
 .../geode/util/internal/UncheckedUtilsTest.java    | 60 ++++++++++++++++++++++
 .../cache/PartitionedRegionSingleHopDUnitTest.java | 14 ++---
 ...istributedRegionFunctionExecutionDUnitTest.java | 59 +++++++--------------
 ...oningWithColocationAndPersistenceDUnitTest.java | 12 ++---
 .../FunctionExecutionOnLonerRegressionTest.java    | 27 ++--------
 .../client/internal/ClientMetadataService.java     |  4 +-
 .../geode/internal/cache/GemFireCacheImpl.java     | 49 ++++--------------
 .../cache/InternalCacheForClientAccess.java        |  6 +--
 .../apache/geode/internal/cache/LocalRegion.java   |  7 ++-
 .../util/TypedFunctionService.java}                | 20 +++++---
 .../tier/sockets/CacheClientProxyFactory.java      |  4 +-
 .../internal/ClusterAlertMessagingTest.java        |  6 +--
 .../apache/geode/internal/tcp/TCPConduitTest.java  |  4 +-
 14 files changed, 151 insertions(+), 140 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java b/geode-common/src/main/java/org/apache/geode/util/internal/UncheckedUtils.java
similarity index 68%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
copy to geode-common/src/main/java/org/apache/geode/util/internal/UncheckedUtils.java
index c03e990..61dbd8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
+++ b/geode-common/src/main/java/org/apache/geode/util/internal/UncheckedUtils.java
@@ -12,18 +12,23 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.util;
-
-import org.apache.geode.cache.execute.Execution;
+package org.apache.geode.util.internal;
 
+/**
+ * Utilities for casting and working with unchecked raw types.
+ */
 @SuppressWarnings({"unchecked", "unused"})
 public class UncheckedUtils {
 
-  public static <T> T cast(Object object) {
-    return (T) object;
+  protected UncheckedUtils() {
+    // do not instantiate
   }
 
-  public static <IN, OUT, AGG> Execution<IN, OUT, AGG> cast(Execution execution) {
-    return execution;
+  /**
+   * Casts an instance of a raw type to a parameterized type. Preference should be given to
+   * converting all code from using raw types to using parameterized types when possible.
+   */
+  public static <T> T uncheckedCast(Object object) {
+    return (T) object;
   }
 }
diff --git a/geode-common/src/test/java/org/apache/geode/util/internal/UncheckedUtilsTest.java b/geode-common/src/test/java/org/apache/geode/util/internal/UncheckedUtilsTest.java
new file mode 100644
index 0000000..7c282b7
--- /dev/null
+++ b/geode-common/src/test/java/org/apache/geode/util/internal/UncheckedUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.util.internal;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+@SuppressWarnings("unchecked")
+public class UncheckedUtilsTest {
+
+  @Test
+  public void uncheckedCast_rawList_empty() {
+    List rawList = new ArrayList();
+
+    List<String> value = uncheckedCast(rawList);
+
+    assertThat(value).isSameAs(rawList);
+  }
+
+  @Test
+  public void uncheckedCast_rawList_nonEmpty() {
+    List rawList = new ArrayList();
+    rawList.add("1");
+    rawList.add("2");
+
+    List<String> value = uncheckedCast(rawList);
+
+    assertThat(value).isSameAs(rawList);
+  }
+
+  @Test
+  public void uncheckedCast_rawList_wrongTypes() {
+    List rawList = new ArrayList();
+    rawList.add(1);
+    rawList.add(2);
+    List<String> wrongType = uncheckedCast(rawList);
+
+    Throwable thrown = catchThrowable(() -> wrongType.get(0));
+
+    assertThat(thrown).isInstanceOf(ClassCastException.class);
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
index 943c503..43019bf 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java
@@ -22,7 +22,6 @@ import static org.apache.geode.cache.RegionShortcut.PARTITION;
 import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
 import static org.apache.geode.internal.lang.SystemPropertyHelper.GEMFIRE_PREFIX;
 import static org.apache.geode.management.ManagementService.getExistingManagementService;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
@@ -32,6 +31,7 @@ import static org.apache.geode.test.dunit.VM.getController;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMId;
 import static org.apache.geode.test.dunit.rules.DistributedRule.getDistributedSystemProperties;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -76,7 +76,6 @@ import org.apache.geode.cache.client.internal.ClientPartitionAdvisor;
 import org.apache.geode.cache.client.internal.InternalClientCache;
 import org.apache.geode.cache.execute.FunctionAdapter;
 import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.RegionFunctionContext;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.LocatorLauncher;
@@ -84,6 +83,7 @@ import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
+import org.apache.geode.internal.cache.execute.util.TypedFunctionService;
 import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.membership.MembershipEvent;
 import org.apache.geode.management.membership.UniversalMembershipListenerAdapter;
@@ -1336,22 +1336,22 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable {
   }
 
   private void executeFunctions(Region<Object, Object> region) {
-    cast(FunctionService.onRegion(region))
+    TypedFunctionService.onRegion(region)
         .withFilter(filter(0))
         .execute(new PutFunction())
         .getResult();
 
-    cast(FunctionService.onRegion(region))
+    TypedFunctionService.onRegion(region)
         .withFilter(filter(0, 1))
         .execute(new PutFunction())
         .getResult();
 
-    cast(FunctionService.onRegion(region))
+    TypedFunctionService.onRegion(region)
         .withFilter(filter(0, 1, 2, 3))
         .execute(new PutFunction())
         .getResult();
 
-    cast(FunctionService.onRegion(region))
+    TypedFunctionService.onRegion(region)
         .execute(new PutFunction())
         .getResult();
   }
@@ -1389,7 +1389,7 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable {
   }
 
   private InternalCache getInternalCache(ServerLauncher serverLauncher) {
-    return cast(serverLauncher.getCache());
+    return uncheckedCast(serverLauncher.getCache());
   }
 
   private void waitForLocalBucketsCreation() {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutionDUnitTest.java
index b95a68b..b308b20 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutionDUnitTest.java
@@ -22,13 +22,13 @@ import static org.apache.geode.distributed.ConfigurationProperties.NAME;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
 import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
-import static org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutionDUnitTest.UncheckedUtils.cast;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.VM.getController;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.toArray;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 
@@ -59,7 +59,6 @@ import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.InternalClientCache;
-import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
@@ -70,6 +69,7 @@ import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.execute.util.TypedFunctionService;
 import org.apache.geode.security.templates.DummyAuthenticator;
 import org.apache.geode.security.templates.UserPasswordAuthInit;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -257,7 +257,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
     empty.invoke(() -> populateRegion(200));
 
     AsyncInvocation executeFunctionInReplicate1 = replicate1.invokeAsync(() -> {
-      ResultCollector<String, List<String>> resultCollector = FunctionServiceCast
+      ResultCollector<String, List<String>> resultCollector = TypedFunctionService
           .<Void, String, List<String>>onRegion(getRegion())
           .withFilter(filter)
           .execute(LongRunningFunction.class.getSimpleName(), getTimeout().toMillis(),
@@ -302,7 +302,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
 
     replicate1.invoke(() -> {
       Throwable thrown = catchThrowable(() -> {
-        FunctionServiceCast
+        TypedFunctionService
             .<Void, String, List<String>>onRegion(getRegion())
             .withFilter(filter)
             .execute(LongRunningFunction.class.getSimpleName(), 1000, MILLISECONDS);
@@ -950,7 +950,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
     }
 
     client.invoke(() -> {
-      ResultCollector<Boolean, List<Boolean>> resultCollector = FunctionServiceCast
+      ResultCollector<Boolean, List<Boolean>> resultCollector = TypedFunctionService
           .<Boolean, Boolean, List<Boolean>>onRegion(getRegion())
           .setArguments(true)
           .execute(inlineFunction("Success", true));
@@ -1158,13 +1158,13 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   }
 
   private void executeNoResultFunction() {
-    FunctionServiceCast
+    TypedFunctionService
         .onRegion(getRegion())
         .execute(new NoResultFunction());
   }
 
   private List<Boolean> executeDistributedRegionFunction() {
-    return FunctionServiceCast
+    return TypedFunctionService
         .<Boolean, Boolean, List<Boolean>>onRegion(getRegion())
         .withFilter(filter)
         .setArguments(false)
@@ -1173,7 +1173,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   }
 
   private void executeThrowsRuntimeExceptionFunction() {
-    FunctionServiceCast
+    TypedFunctionService
         .<Void, Void, Void>onRegion(getRegion())
         .withFilter(filter)
         .execute(new ThrowsRuntimeExceptionFunction());
@@ -1187,7 +1187,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
       filter.add("execKey-" + 100 + i);
     }
 
-    ResultCollector<Object, List<Object>> resultCollector = FunctionServiceCast
+    ResultCollector<Object, List<Object>> resultCollector = TypedFunctionService
         .<Boolean, Object, List<Object>>onRegion(getRegion())
         .withFilter(filter)
         .setArguments(true)
@@ -1200,7 +1200,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
         .as("First element of " + resultCollector.getResult())
         .isInstanceOf(CustomRuntimeException.class);
 
-    resultCollector = FunctionServiceCast
+    resultCollector = TypedFunctionService
         .<Set<String>, Object, List<Object>>onRegion(getRegion())
         .withFilter(filter)
         .setArguments(filter)
@@ -1220,7 +1220,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
 
   private void executeNoLastResultFunction() {
     Throwable thrown = catchThrowable(() -> {
-      FunctionServiceCast
+      TypedFunctionService
           .onRegion(getRegion())
           .withFilter(filter)
           .execute(new NoLastResultFunction())
@@ -1234,7 +1234,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   private void executeUnregisteredFunction() {
     FunctionService.unregisterFunction(new DistributedRegionFunction().getId());
 
-    FunctionServiceCast
+    TypedFunctionService
         .<Void, Boolean, List<Boolean>>onRegion(getRegion())
         .withFilter(filter)
         .execute(new DistributedRegionFunction())
@@ -1242,7 +1242,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   }
 
   private void executeFunctionFunctionInvocationTargetException() {
-    ResultCollector<Integer, List<Integer>> resultCollector = FunctionServiceCast
+    ResultCollector<Integer, List<Integer>> resultCollector = TypedFunctionService
         .<Boolean, Integer, List<Integer>>onRegion(getRegion())
         .setArguments(true)
         .execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName());
@@ -1253,7 +1253,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
 
   private void executeFunctionFunctionInvocationTargetExceptionWithoutHA() {
     Throwable thrown = catchThrowable(() -> {
-      FunctionServiceCast
+      TypedFunctionService
           .<Boolean, Integer, List<Integer>>onRegion(getRegion())
           .setArguments(true)
           .execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName())
@@ -1268,7 +1268,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   }
 
   private void executeFunctionFunctionInvocationTargetException_ClientServer() {
-    ResultCollector<Integer, List<Integer>> resultCollector = FunctionServiceCast
+    ResultCollector<Integer, List<Integer>> resultCollector = TypedFunctionService
         .<Boolean, Integer, List<Integer>>onRegion(getRegion())
         .setArguments(true)
         .execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName());
@@ -1279,7 +1279,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
 
   private void executeFunctionFunctionInvocationTargetException_ClientServer_WithoutHA() {
     Throwable thrown = catchThrowable(() -> {
-      FunctionServiceCast
+      TypedFunctionService
           .<Boolean, Integer, List<Integer>>onRegion(getRegion())
           .setArguments(true)
           .execute(ThrowsFunctionInvocationTargetExceptionFunction.class.getSimpleName())
@@ -1294,7 +1294,7 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
   }
 
   private static <K, V> Region<K, V> getRegion() {
-    return cast(REGION.get());
+    return uncheckedCast(REGION.get());
   }
 
   private static void setRegion(Region<?, ?> region) {
@@ -1327,7 +1327,8 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
     @Override
     public void execute(FunctionContext<Object> context) {
       if (context.getArguments() instanceof Set) {
-        Set<Integer> arguments = cast(context.getArguments());
+        Set<Integer> arguments =
+            uncheckedCast(context.getArguments());
         for (int i = 0; i < arguments.size(); i++) {
           context.getResultSender().sendResult(i);
         }
@@ -1557,26 +1558,4 @@ public class DistributedRegionFunctionExecutionDUnitTest implements Serializable
       return -1;
     }
   }
-
-  @SuppressWarnings("unchecked")
-  static class FunctionServiceCast {
-
-    /**
-     * Provide unchecked cast of FunctionService.onRegion.
-     */
-    static <IN, OUT, AGG> Execution<IN, OUT, AGG> onRegion(Region<?, ?> region) {
-      return FunctionService.onRegion(region);
-    }
-  }
-
-  @SuppressWarnings({"unchecked", "unused"})
-  static class UncheckedUtils {
-
-    /**
-     * Provide unchecked cast of specified Object.
-     */
-    static <T> T cast(Object object) {
-      return (T) object;
-    }
-  }
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/fixed/FixedPartitioningWithColocationAndPersistenceDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/fixed/FixedPartitioningWithColocationAndPersistenceDUnitTest.java
index ea7c834..99db19a 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/fixed/FixedPartitioningWithColocationAndPersistenceDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/fixed/FixedPartitioningWithColocationAndPersistenceDUnitTest.java
@@ -36,10 +36,10 @@ import static org.apache.geode.internal.cache.partitioned.fixed.FixedPartitionin
 import static org.apache.geode.internal.cache.partitioned.fixed.FixedPartitioningWithColocationAndPersistenceDUnitTest.Quarter.Q2;
 import static org.apache.geode.internal.cache.partitioned.fixed.FixedPartitioningWithColocationAndPersistenceDUnitTest.Quarter.Q3;
 import static org.apache.geode.internal.cache.partitioned.fixed.FixedPartitioningWithColocationAndPersistenceDUnitTest.Quarter.Q4;
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.mock;
@@ -1502,9 +1502,9 @@ public class FixedPartitioningWithColocationAndPersistenceDUnitTest implements S
     PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
 
     if (dataStore != null) {
-      return cast(new LocalDataSet(partitionedRegion, dataStore.getAllLocalBucketIds()));
+      return uncheckedCast(new LocalDataSet(partitionedRegion, dataStore.getAllLocalBucketIds()));
     }
-    return cast(new LocalDataSet(partitionedRegion, emptySet()));
+    return uncheckedCast(new LocalDataSet(partitionedRegion, emptySet()));
   }
 
   private void validateQuartersData() throws ParseException {
@@ -1555,20 +1555,20 @@ public class FixedPartitioningWithColocationAndPersistenceDUnitTest implements S
       InternalDistributedMember idmForShipment = shipments.getBucketPrimary(i);
 
       // take all the keys from the shipment for each bucket
-      Set<CustomerId> customerKey = cast(customers.getBucketKeys(i));
+      Set<CustomerId> customerKey = uncheckedCast(customers.getBucketKeys(i));
       assertThat(customerKey).isNotNull();
 
       for (CustomerId customerId : customerKey) {
         assertThat(customers.get(customerId)).isNotNull();
 
-        Set<OrderId> orderKey = cast(orders.getBucketKeys(i));
+        Set<OrderId> orderKey = uncheckedCast(orders.getBucketKeys(i));
         for (OrderId orderId : orderKey) {
           assertThat(orders.get(orderId)).isNotNull();
           if (orderId.getCustomerId().equals(customerId)) {
             assertThat(idmForOrder).isEqualTo(idmForCustomer);
           }
 
-          Set<ShipmentId> shipmentKey = cast(shipments.getBucketKeys(i));
+          Set<ShipmentId> shipmentKey = uncheckedCast(shipments.getBucketKeys(i));
           for (ShipmentId shipmentId : shipmentKey) {
             assertThat(shipments.get(shipmentId)).isNotNull();
             if (shipmentId.getOrderId().equals(orderId)) {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
index 08b75cf..d0c4cbc 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
@@ -19,7 +19,7 @@ import static org.apache.geode.cache.RegionShortcut.REPLICATE;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
-import static org.apache.geode.internal.cache.execute.FunctionExecutionOnLonerRegressionTest.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collection;
@@ -35,15 +35,14 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.RegionFunctionContext;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.LonerDistributionManager;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.execute.util.TypedFunctionService;
 import org.apache.geode.test.junit.categories.FunctionServiceTest;
 
 /**
@@ -88,7 +87,7 @@ public class FunctionExecutionOnLonerRegressionTest {
 
     populateRegion(region);
 
-    ResultCollector<Collection<String>, Collection<String>> resultCollector = FunctionServiceCast
+    ResultCollector<Collection<String>, Collection<String>> resultCollector = TypedFunctionService
         .<Void, Collection<String>, Collection<String>>onRegion(region)
         .withFilter(keysForGet)
         .execute(new TestFunction(DataSetSupplier.PARTITIONED));
@@ -105,7 +104,7 @@ public class FunctionExecutionOnLonerRegressionTest {
 
     populateRegion(region);
 
-    ResultCollector<Collection<String>, Collection<String>> resultCollector = FunctionServiceCast
+    ResultCollector<Collection<String>, Collection<String>> resultCollector = TypedFunctionService
         .<Void, Collection<String>, Collection<String>>onRegion(region)
         .withFilter(keysForGet)
         .execute(new TestFunction(DataSetSupplier.REPLICATE));
@@ -167,7 +166,7 @@ public class FunctionExecutionOnLonerRegressionTest {
     @Override
     public void execute(FunctionContext<String> context) {
       RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context;
-      Set<String> keys = cast(regionFunctionContext.getFilter());
+      Set<String> keys = uncheckedCast(regionFunctionContext.getFilter());
       String lastKey = keys.iterator().next();
       keys.remove(lastKey);
 
@@ -185,20 +184,4 @@ public class FunctionExecutionOnLonerRegressionTest {
       return getClass().getName();
     }
   }
-
-  @SuppressWarnings({"unchecked", "WeakerAccess"})
-  private static class FunctionServiceCast {
-
-    static <IN, OUT, AGG> Execution<IN, OUT, AGG> onRegion(Region<?, ?> region) {
-      return FunctionService.onRegion(region);
-    }
-  }
-
-  @SuppressWarnings({"unchecked", "unused"})
-  static class UncheckedUtils {
-
-    static <T> T cast(Object object) {
-      return (T) object;
-    }
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index 0aaa9df..8b89bb8 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.cache.client.internal;
 
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -230,7 +230,7 @@ public class ClientMetadataService {
 
     for (Map.Entry entry : serverToBuckets.entrySet()) {
       ServerLocation server = (ServerLocation) entry.getKey();
-      Set<Integer> buckets = cast(entry.getValue());
+      Set<Integer> buckets = uncheckedCast(entry.getValue());
       for (Integer bucket : buckets) {
         // use LinkedHashSet to maintain the order of keys
         // the keys will be iterated several times
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 2f6b0b4..2cd1f3c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -33,10 +33,6 @@ import static org.apache.geode.distributed.internal.ClusterDistributionManager.L
 import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_DURABLE_CLIENT_ID;
 import static org.apache.geode.distributed.internal.InternalDistributedSystem.getAnyInstance;
 import static org.apache.geode.internal.cache.ColocationHelper.getColocatedChildRegions;
-import static org.apache.geode.internal.cache.GemFireCacheImpl.UncheckedUtils.asDistributedMemberSet;
-import static org.apache.geode.internal.cache.GemFireCacheImpl.UncheckedUtils.createMapArray;
-import static org.apache.geode.internal.cache.GemFireCacheImpl.UncheckedUtils.uncheckedCast;
-import static org.apache.geode.internal.cache.GemFireCacheImpl.UncheckedUtils.uncheckedRegionAttributes;
 import static org.apache.geode.internal.cache.LocalRegion.setThreadInitLevelRequirement;
 import static org.apache.geode.internal.cache.PartitionedRegion.DISK_STORE_FLUSHED;
 import static org.apache.geode.internal.cache.PartitionedRegion.OFFLINE_EQUAL_PERSISTED;
@@ -44,11 +40,11 @@ import static org.apache.geode.internal.cache.PartitionedRegion.PRIMARY_BUCKETS_
 import static org.apache.geode.internal.cache.PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME;
 import static org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType.HEAP_MEMORY;
 import static org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType.OFFHEAP_MEMORY;
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
 import static org.apache.geode.internal.logging.CoreLoggingExecutors.newThreadPoolWithFixedFeed;
 import static org.apache.geode.internal.tcp.ConnectionTable.threadWantsSharedResources;
 import static org.apache.geode.logging.internal.executors.LoggingExecutors.newFixedThreadPool;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -1812,7 +1808,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
             && partitionedRegion.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
           int numBuckets = partitionedRegion.getTotalNumberOfBuckets();
           Map<InternalDistributedMember, PersistentMemberID>[] bucketMaps =
-              createMapArray(numBuckets);
+              uncheckedCast(new Map[numBuckets]);
           PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
 
           // lock all the primary buckets
@@ -2666,18 +2662,18 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public Set<DistributedMember> getAdminMembers() {
-    return asDistributedMemberSet(dm.getAdminMemberSet());
+    return uncheckedCast(dm.getAdminMemberSet());
   }
 
   @Override
   public Set<DistributedMember> getMembers(Region region) {
     if (region instanceof DistributedRegion) {
       DistributedRegion distributedRegion = (DistributedRegion) region;
-      return asDistributedMemberSet(distributedRegion.getDistributionAdvisor().adviseCacheOp());
+      return uncheckedCast(distributedRegion.getDistributionAdvisor().adviseCacheOp());
     }
     if (region instanceof PartitionedRegion) {
       PartitionedRegion partitionedRegion = (PartitionedRegion) region;
-      return asDistributedMemberSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
+      return uncheckedCast(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
     }
     return emptySet();
   }
@@ -3063,15 +3059,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
       system.handleResourceEvent(ResourceEvent.REGION_CREATE, region);
     }
 
-    return cast(region);
+    return uncheckedCast(region);
   }
 
   @Override
   public <K, V> RegionAttributes<K, V> invokeRegionBefore(InternalRegion parent, String name,
       RegionAttributes<K, V> attrs, InternalRegionArguments internalRegionArgs) {
     for (RegionListener listener : regionListeners) {
-      attrs =
-          uncheckedRegionAttributes(listener.beforeCreate(parent, name, attrs, internalRegionArgs));
+      attrs = uncheckedCast(listener.beforeCreate(parent, name, attrs, internalRegionArgs));
     }
     return attrs;
   }
@@ -3187,7 +3182,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public <K, V> Region<K, V> getRegionByPath(String path) {
-    return cast(getInternalRegionByPath(path));
+    return uncheckedCast(getInternalRegionByPath(path));
   }
 
   @Override
@@ -3244,7 +3239,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
         stopper.checkCancelInProgress(null);
         return null;
       }
-      return cast(result);
+      return uncheckedCast(result);
     }
 
     String[] pathParts = parsePath(path);
@@ -3268,7 +3263,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
       logger.debug("GemFireCache.getRegion, calling getSubregion on rootRegion({}): {}",
           pathParts[0], pathParts[1]);
     }
-    return cast(rootRegion.getSubregion(pathParts[1], returnDestroyedRegion));
+    return uncheckedCast(rootRegion.getSubregion(pathParts[1], returnDestroyedRegion));
   }
 
   @Override
@@ -4058,7 +4053,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public <K, V> RegionAttributes<K, V> getRegionAttributes(String id) {
-    return GemFireCacheImpl.UncheckedUtils.<K, V>uncheckedCast(namedRegionAttributes).get(id);
+    return uncheckedCast(namedRegionAttributes.get(id));
   }
 
   @Override
@@ -5176,28 +5171,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
     }
   }
 
-  @SuppressWarnings("unchecked")
-  static class UncheckedUtils {
-
-    static Map<InternalDistributedMember, PersistentMemberID>[] createMapArray(int size) {
-      return new Map[size];
-    }
-
-    static Set<DistributedMember> asDistributedMemberSet(
-        Set<InternalDistributedMember> internalDistributedMembers) {
-      return (Set) internalDistributedMembers;
-    }
-
-    static <K, V> RegionAttributes<K, V> uncheckedRegionAttributes(RegionAttributes region) {
-      return region;
-    }
-
-    static <K, V> Map<String, RegionAttributes<K, V>> uncheckedCast(
-        Map<String, RegionAttributes<?, ?>> namedRegionAttributes) {
-      return (Map) namedRegionAttributes;
-    }
-  }
-
   @FunctionalInterface
   @VisibleForTesting
   interface TXManagerImplFactory {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
index 10635fe..dbbb98b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
@@ -16,7 +16,7 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.File;
 import java.io.IOException;
@@ -162,7 +162,7 @@ public class InternalCacheForClientAccess implements InternalCache {
   public <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion) {
     Region result = delegate.getRegion(path, returnDestroyedRegion);
     checkForInternalRegion(result);
-    return cast(result);
+    return uncheckedCast(result);
   }
 
   @Override
@@ -176,7 +176,7 @@ public class InternalCacheForClientAccess implements InternalCache {
   public <K, V> Region<K, V> getRegionByPath(String path) {
     InternalRegion result = delegate.getInternalRegionByPath(path);
     checkForInternalRegion(result);
-    return cast(result);
+    return uncheckedCast(result);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 849187b..098115b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -17,9 +17,9 @@ package org.apache.geode.internal.cache;
 import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.AFTER_INITIAL_IMAGE;
 import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;
 import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
 import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
 import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -8943,7 +8943,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
         txState.getRealDeal(null, this);
       }
       try {
-        proxyResult = getServerProxy().putAll(cast(map), eventId, !event.isGenerateCallbacks(),
+        proxyResult = getServerProxy().putAll(
+            uncheckedCast(map),
+            eventId,
+            !event.isGenerateCallbacks(),
             event.getCallbackArgument());
         if (isDebugEnabled) {
           logger.debug("PutAll received response from server: {}", proxyResult);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/TypedFunctionService.java
similarity index 61%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/TypedFunctionService.java
index c03e990..3f21439 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/TypedFunctionService.java
@@ -12,18 +12,26 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.util;
+package org.apache.geode.internal.cache.execute.util;
 
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.FunctionService;
 
+/**
+ * Utilities for casting and working around raw types in the {@link FunctionService} API.
+ */
 @SuppressWarnings({"unchecked", "unused"})
-public class UncheckedUtils {
+public class TypedFunctionService {
 
-  public static <T> T cast(Object object) {
-    return (T) object;
+  protected TypedFunctionService() {
+    // do not instantiate
   }
 
-  public static <IN, OUT, AGG> Execution<IN, OUT, AGG> cast(Execution execution) {
-    return execution;
+  /**
+   * Adds parameterized type support to {@link FunctionService#onRegion(Region)}.
+   */
+  public static <IN, OUT, AGG> Execution<IN, OUT, AGG> onRegion(Region<?, ?> region) {
+    return FunctionService.onRegion(region);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyFactory.java
index a3def49..ae26aa7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyFactory.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.lang.reflect.InvocationTargetException;
 import java.net.Socket;
@@ -55,7 +55,7 @@ public class CacheClientProxyFactory {
     }
     try {
       Class<InternalCacheClientProxyFactory> proxyClass =
-          cast(ClassPathLoader.getLatest().forName(proxyClassName));
+          uncheckedCast(ClassPathLoader.getLatest().forName(proxyClassName));
       return proxyClass.getConstructor().newInstance();
     } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
         | IllegalAccessException | InvocationTargetException e) {
diff --git a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
index f7bc8d8..eadf54b 100644
--- a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
+++ b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.alerting.internal;
 
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -141,7 +141,7 @@ public class ClusterAlertMessagingTest {
   public void sendAlertLogsWarning_ifAlertingIOExceptionIsCaught() {
     ExecutorService executor = currentThreadExecutorService();
     ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
-    Consumer<AlertingIOException> alertingIOExceptionLogger = cast(mock(Consumer.class));
+    Consumer<AlertingIOException> alertingIOExceptionLogger = uncheckedCast(mock(Consumer.class));
     ClusterAlertMessaging clusterAlertMessaging =
         spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger);
     doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener")))
@@ -162,7 +162,7 @@ public class ClusterAlertMessagingTest {
   public void sendAlertLogsWarningOnce_ifAlertingIOExceptionIsCaught() {
     ExecutorService executor = currentThreadExecutorService();
     ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
-    Consumer<AlertingIOException> alertingIOExceptionLogger = cast(mock(Consumer.class));
+    Consumer<AlertingIOException> alertingIOExceptionLogger = uncheckedCast(mock(Consumer.class));
     ClusterAlertMessaging clusterAlertMessaging =
         spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger);
     doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener")))
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
index 0c30ce2..c0bf0a5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.geode.internal.tcp;
 
-import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.anyBoolean;
@@ -62,7 +62,7 @@ public class TCPConduitTest {
 
   @Before
   public void setUp() throws Exception {
-    membership = cast(mock(Membership.class));
+    membership = uncheckedCast(mock(Membership.class));
     directChannel = mock(DirectChannel.class);
     connectionTable = mock(ConnectionTable.class);
     socketCreator = new SocketCreator(new SSLConfig.Builder().build());


[geode] 02/03: GEODE-8540: Create new DistributedBlackboard Rule (#5557)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch backport-1-13-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9a066e64b1722f914994fecb454a2f67f2a46148
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Sep 30 09:39:30 2020 -0700

    GEODE-8540: Create new DistributedBlackboard Rule (#5557)
    
    Package up DUnitBlackboard as a JUnit Rule named DistributedBlackboard.
    
    (cherry picked from commit 26cb822f2ee467545dd708ecc867cebbd2473c70)
---
 .../dunit/internal/DUnitBlackboardDUnitTest.java   |  75 +++---
 .../DistributedBlackboardDistributedTest.java      | 297 +++++++++++++++++++++
 .../InternalBlackboard.java => Blackboard.java}    |  54 ++--
 .../apache/geode/test/dunit/DUnitBlackboard.java   |  55 ++--
 .../test/dunit/internal/InternalBlackboard.java    |  33 ++-
 .../dunit/internal/InternalBlackboardImpl.java     |  59 ++--
 .../test/dunit/rules/DistributedBlackboard.java    | 138 ++++++++++
 7 files changed, 584 insertions(+), 127 deletions(-)

diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/internal/DUnitBlackboardDUnitTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/internal/DUnitBlackboardDUnitTest.java
index ae78247..5e151d7 100755
--- a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/internal/DUnitBlackboardDUnitTest.java
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/internal/DUnitBlackboardDUnitTest.java
@@ -14,83 +14,70 @@
  */
 package org.apache.geode.test.dunit.internal;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.junit.Test;
 
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 
-
+@SuppressWarnings("serial")
 public class DUnitBlackboardDUnitTest extends JUnit4DistributedTestCase {
+
   @Test
-  public void canPassDataBetweenVMs() throws Exception {
+  public void canPassDataBetweenVMs() {
     final String MBOX = "myMailbox";
-    VM vm0 = Host.getHost(0).getVM(0);
-    VM vm1 = Host.getHost(0).getVM(1);
+    VM vm0 = getVM(0);
+    VM vm1 = getVM(1);
 
     vm0.invoke("put data in mailbox", () -> getBlackboard().setMailbox(MBOX, "testing"));
 
-    String result = (String) vm1.invoke("get data from mailbox", () -> {
-      return getBlackboard().getMailbox(MBOX);
-    });
+    String result = vm1.invoke("get data from mailbox", () -> getBlackboard().getMailbox(MBOX));
 
-    assertEquals("testing", result);
+    assertThat(result).isEqualTo("testing");
   }
 
   @Test
-  public void canSignalAnotherVM() throws Exception {
+  public void canSignalAnotherVM() {
     final String GATE = "myGate";
-    VM vm0 = Host.getHost(0).getVM(0);
-    VM vm1 = Host.getHost(0).getVM(1);
+    VM vm0 = getVM(0);
+    VM vm1 = getVM(1);
 
     vm1.invoke("wait on gate not yet signalled", () -> {
-      assertFalse(getBlackboard().isGateSignaled(GATE));
-      try {
-        getBlackboard().waitForGate(GATE, 1, TimeUnit.SECONDS);
-      } catch (TimeoutException e) {
-        // expected
-        return;
-      } catch (InterruptedException e) {
-        fail("unexpected interrupt");
-      }
-      fail("unexpected success");
+      assertThat(getBlackboard().isGateSignaled(GATE)).isFalse();
+
+      Throwable thrown = catchThrowable(() -> {
+        getBlackboard().waitForGate(GATE, 1, SECONDS);
+      });
+
+      assertThat(thrown).isInstanceOf(TimeoutException.class);
     });
 
     vm0.invoke("signal gate", () -> getBlackboard().signalGate(GATE));
 
-    vm1.invoke("wait on gate not yet signalled", () -> {
-      try {
-        getBlackboard().waitForGate(GATE, 1, TimeUnit.SECONDS);
-      } catch (TimeoutException e) {
-        fail("unexpected timeout");
-      } catch (InterruptedException e) {
-        fail("unexpected interrupt");
-      }
-      // success expected
-    });
+    vm1.invoke("wait on gate not yet signalled",
+        () -> getBlackboard().waitForGate(GATE, 1, SECONDS));
   }
 
   @Test
-  public void initBlackboardClearsEverything() throws Exception {
+  public void initBlackboardClearsEverything() {
     for (int i = 0; i < 100; i++) {
       getBlackboard().setMailbox("MBOX" + i, "value" + i);
-      assertEquals("value" + i, getBlackboard().getMailbox("MBOX" + i));
+      assertThat((Object) getBlackboard().getMailbox("MBOX" + i)).isEqualTo("value" + i);
+
       getBlackboard().signalGate("GATE" + i);
-      assertTrue(getBlackboard().isGateSignaled("GATE" + i));
+      assertThat(getBlackboard().isGateSignaled("GATE" + i)).isTrue();
     }
-    Host.getHost(0).getVM(1).invoke("clear blackboard", () -> getBlackboard().initBlackboard());
+
+    getVM(1).invoke("clear blackboard", () -> getBlackboard().initBlackboard());
 
     for (int i = 0; i < 100; i++) {
-      assertNull(getBlackboard().getMailbox("MBOX" + i));
-      assertFalse(getBlackboard().isGateSignaled("GATE" + i));
+      assertThat((Object) getBlackboard().getMailbox("MBOX" + i)).isNull();
+      assertThat(getBlackboard().isGateSignaled("GATE" + i)).isFalse();
     }
   }
 }
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedBlackboardDistributedTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedBlackboardDistributedTest.java
new file mode 100644
index 0000000..ea3ed2e
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedBlackboardDistributedTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@SuppressWarnings({"serial", "CodeBlock2Expr"})
+public class DistributedBlackboardDistributedTest implements Serializable {
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Test
+  public void canPassDataBetweenVMs() {
+    VM vm0 = getVM(0);
+    VM vm1 = getVM(1);
+
+    vm0.invoke("put data in mailbox", () -> blackboard.setMailbox(mailbox(), value()));
+
+    String result = vm1.invoke("get data from mailbox", () -> blackboard.getMailbox(mailbox()));
+
+    assertThat(result).isEqualTo(value());
+  }
+
+  @Test
+  public void canSignalAnotherVM() {
+    VM vm0 = getVM(0);
+    VM vm1 = getVM(1);
+
+    vm1.invoke("wait on gate not yet signalled", () -> {
+      assertThat(blackboard.isGateSignaled(gate())).isFalse();
+
+      Throwable thrown = catchThrowable(() -> {
+        blackboard.waitForGate(gate(), 1, SECONDS);
+      });
+
+      assertThat(thrown).isInstanceOf(TimeoutException.class);
+    });
+
+    vm0.invoke("signal gate", () -> blackboard.signalGate(gate()));
+
+    vm1.invoke("wait on gate not yet signalled", () -> blackboard.waitForGate(gate(), 1, SECONDS));
+  }
+
+  @Test
+  public void initBlackboardClearsEverything() {
+    for (int i = 0; i < 100; i++) {
+      blackboard.setMailbox(mailbox(i), value(i));
+      assertThat((Object) blackboard.getMailbox(mailbox(i))).isEqualTo(value(i));
+
+      blackboard.signalGate(gate(i));
+      assertThat(blackboard.isGateSignaled(gate(i))).isTrue();
+    }
+
+    getVM(1).invoke("clear blackboard", () -> blackboard.initBlackboard());
+
+    for (int i = 0; i < 100; i++) {
+      assertThat((Object) blackboard.getMailbox(mailbox(i))).isNull();
+      assertThat(blackboard.isGateSignaled(gate(i))).isFalse();
+    }
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromSameVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(0).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromOtherVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(1).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromSameVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(0).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromOtherVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(1).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(2).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromSameVM_afterBouncingVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(0).bounceForcibly();
+
+    getVM(0).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromOtherVM_afterBouncingVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(0).bounceForcibly();
+
+    getVM(1).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromSameVM_afterBouncingVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(0).bounceForcibly();
+
+    getVM(0).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox(1))).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromOtherVM_afterBouncingFirstVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(1).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(0).bounceForcibly();
+
+    getVM(2).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromOtherVM_afterBouncingSecondVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(1).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(1).bounceForcibly();
+
+    getVM(2).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void setMailbox_overwrites_valueFromOtherVM_afterBouncingBothVMs() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value(1)));
+    getVM(1).invoke(() -> blackboard.setMailbox(mailbox(), value(2)));
+
+    getVM(0).bounceForcibly();
+    getVM(1).bounceForcibly();
+
+    getVM(2).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value(2));
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromSameVM_afterBouncingEveryVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(0).bounceForcibly();
+    getVM(1).bounceForcibly();
+    getVM(2).bounceForcibly();
+    getVM(3).bounceForcibly();
+
+    getVM(0).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromOtherVM_afterBouncingEveryVM() {
+    getVM(0).invoke(() -> blackboard.setMailbox(mailbox(), value()));
+
+    getVM(0).bounceForcibly();
+    getVM(1).bounceForcibly();
+    getVM(2).bounceForcibly();
+    getVM(3).bounceForcibly();
+
+    getVM(1).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueFromControllerVM_afterBouncingEveryVM() {
+    blackboard.setMailbox(mailbox(), value());
+
+    getVM(0).bounceForcibly();
+    getVM(1).bounceForcibly();
+    getVM(2).bounceForcibly();
+    getVM(3).bounceForcibly();
+
+    getVM(3).invoke(() -> {
+      assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    });
+  }
+
+  @Test
+  public void getMailbox_returnsValueInControllerVM_afterBouncingEveryVM() {
+    blackboard.setMailbox(mailbox(), value());
+
+    getVM(0).bounceForcibly();
+    getVM(1).bounceForcibly();
+    getVM(2).bounceForcibly();
+    getVM(3).bounceForcibly();
+
+    assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+  }
+
+  @Test
+  public void getMailbox_returnsValueInEveryVM() {
+    blackboard.setMailbox(mailbox(), value());
+
+    assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+    for (VM vm : asList(getController(), getVM(0), getVM(1), getVM(2), getVM(3))) {
+      vm.invoke(() -> {
+        assertThat((Object) blackboard.getMailbox(mailbox())).isEqualTo(value());
+      });
+    }
+  }
+
+  private String mailbox() {
+    return value("mailbox", 1);
+  }
+
+  private String value() {
+    return value("value", 1);
+  }
+
+  private String gate() {
+    return value("gate", 1);
+  }
+
+  private String mailbox(int count) {
+    return value("mailbox", count);
+  }
+
+  private String value(int count) {
+    return value("value", count);
+  }
+
+  private String gate(int count) {
+    return value("gate", count);
+  }
+
+  private String value(String prefix, int count) {
+    return prefix + "-" + testName.getMethodName() + "-" + count;
+  }
+}
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/Blackboard.java
old mode 100755
new mode 100644
similarity index 51%
copy from geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
copy to geode-dunit/src/main/java/org/apache/geode/test/dunit/Blackboard.java
index 24abf4f..2b15ebd
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/Blackboard.java
@@ -12,66 +12,68 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.test.dunit.internal;
+package org.apache.geode.test.dunit;
 
-import java.io.Serializable;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
 /**
- * DUnitBlackboard provides mailboxes and synchronization gateways for distributed unit tests.
+ * Blackboard provides mailboxes and synchronization gateways for distributed tests.
+ *
  * <p>
  * Tests may use the blackboard to pass objects and status between JVMs with mailboxes instead of
  * using static variables in classes. The caveat being that the objects will be serialized using
  * Java serialization.
+ *
  * <p>
- * Gates may be used to synchronize operations between unit test JVMs. Combined with Awaitility
- * these can be used to test for conditions being met, actions having happened, etc.
+ * Gates may be used to synchronize operations between distributed test JVMs. Combined with
+ * Awaitility these can be used to test for conditions being met, actions having happened, etc.
+ *
  * <p>
  * Look for references to the given methods in your IDE for examples.
  */
-public interface InternalBlackboard extends Remote, Serializable {
+public interface Blackboard {
+
   /**
-   * resets the blackboard
+   * Resets the blackboard.
    */
-  void initBlackboard() throws RemoteException;
+  void initBlackboard();
 
   /**
-   * signals a boolean gate
+   * Signals a boolean gate.
    */
-  void signalGate(String gateName) throws RemoteException;
+  void signalGate(String gateName);
 
   /**
-   * wait for a gate to be signaled
+   * Waits at most {@link GeodeAwaitility#getTimeout()} for a gate to be signaled.
    */
-  void waitForGate(String gateName, long timeout, TimeUnit units)
-      throws RemoteException, TimeoutException, InterruptedException;
+  void waitForGate(String gateName) throws TimeoutException, InterruptedException;
 
   /**
-   * clears a gate
+   * Waits at most the specified timeout for a gate to be signaled.
    */
-  void clearGate(String gateName) throws RemoteException;
+  void waitForGate(String gateName, long timeout, TimeUnit units)
+      throws TimeoutException, InterruptedException;
 
   /**
-   * test to see if a gate has been signeled
+   * Clears a gate.
    */
-  boolean isGateSignaled(String gateName) throws RemoteException;
+  void clearGate(String gateName);
 
   /**
-   * put an object into a mailbox slot. The object must be java-serializable
+   * Checks to see if a gate has been signaled.
    */
-  void setMailbox(String boxName, Object value) throws RemoteException;
+  boolean isGateSignaled(String gateName);
 
   /**
-   * retrieve an object from a mailbox slot
+   * Puts an object into a mailbox slot. The object must be java-serializable.
    */
-  <T> T getMailbox(String boxName) throws RemoteException;
+  <T> void setMailbox(String boxName, T value);
 
   /**
-   * ping the blackboard to make sure it's there
+   * Retrieves an object from a mailbox slot.
    */
-  void ping() throws RemoteException;
-
+  <T> T getMailbox(String boxName);
 }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/DUnitBlackboard.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/DUnitBlackboard.java
index d87b99d..1ff5a0a 100755
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/DUnitBlackboard.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/DUnitBlackboard.java
@@ -14,6 +14,9 @@
  */
 package org.apache.geode.test.dunit;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+
 import java.rmi.RemoteException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -22,7 +25,7 @@ import org.apache.geode.test.dunit.internal.InternalBlackboard;
 import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
 
 /**
- * DUnitBlackboard provides mailboxes and synchronization gateways for distributed unit tests.
+ * DUnitBlackboard provides mailboxes and synchronization gateways for distributed tests.
  *
  * <p>
  * Tests may use the blackboard to pass objects and status between JVMs with mailboxes instead of
@@ -36,17 +39,19 @@ import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
  * <p>
  * Look for references to the given methods in your IDE for examples.
  */
-public class DUnitBlackboard {
+public class DUnitBlackboard implements Blackboard {
 
-  private InternalBlackboard blackboard;
+  private final InternalBlackboard blackboard;
 
   public DUnitBlackboard() {
-    blackboard = InternalBlackboardImpl.getInstance();
+    this(InternalBlackboardImpl.getInstance());
+  }
+
+  public DUnitBlackboard(InternalBlackboard blackboard) {
+    this.blackboard = blackboard;
   }
 
-  /**
-   * resets the blackboard
-   */
+  @Override
   public void initBlackboard() {
     try {
       blackboard.initBlackboard();
@@ -55,11 +60,8 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * signals a boolean gate
-   */
+  @Override
   public void signalGate(String gateName) {
-    // System.out.println(Thread.currentThread().getName()+": signaling gate " + gateName);
     try {
       blackboard.signalGate(gateName);
     } catch (RemoteException e) {
@@ -67,12 +69,15 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * wait for a gate to be signaled
-   */
+  @Override
+  public void waitForGate(String gateName)
+      throws TimeoutException, InterruptedException {
+    waitForGate(gateName, getTimeout().toMinutes(), MINUTES);
+  }
+
+  @Override
   public void waitForGate(String gateName, long timeout, TimeUnit units)
       throws TimeoutException, InterruptedException {
-    // System.out.println(Thread.currentThread().getName()+": waiting for gate " + gateName);
     try {
       blackboard.waitForGate(gateName, timeout, units);
     } catch (RemoteException e) {
@@ -80,9 +85,7 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * clear a gate
-   */
+  @Override
   public void clearGate(String gateName) {
     try {
       blackboard.clearGate(gateName);
@@ -91,9 +94,7 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * test to see if a gate has been signeled
-   */
+  @Override
   public boolean isGateSignaled(String gateName) {
     try {
       return blackboard.isGateSignaled(gateName);
@@ -102,9 +103,7 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * put an object into a mailbox slot. The object must be java-serializable
-   */
+  @Override
   public void setMailbox(String boxName, Object value) {
     try {
       blackboard.setMailbox(boxName, value);
@@ -113,9 +112,7 @@ public class DUnitBlackboard {
     }
   }
 
-  /**
-   * retrieve an object from a mailbox slot
-   */
+  @Override
   public <T> T getMailbox(String boxName) {
     try {
       return blackboard.getMailbox(boxName);
@@ -123,4 +120,8 @@ public class DUnitBlackboard {
       throw new RuntimeException("remote call failed", e);
     }
   }
+
+  public InternalBlackboard internal() {
+    return blackboard;
+  }
 }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
index 24abf4f..222a301 100755
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
@@ -17,61 +17,70 @@ package org.apache.geode.test.dunit.internal;
 import java.io.Serializable;
 import java.rmi.Remote;
 import java.rmi.RemoteException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * DUnitBlackboard provides mailboxes and synchronization gateways for distributed unit tests.
+ * InternalBlackboard provides mailboxes and synchronization gateways for distributed tests.
+ *
  * <p>
  * Tests may use the blackboard to pass objects and status between JVMs with mailboxes instead of
  * using static variables in classes. The caveat being that the objects will be serialized using
  * Java serialization.
+ *
  * <p>
  * Gates may be used to synchronize operations between unit test JVMs. Combined with Awaitility
  * these can be used to test for conditions being met, actions having happened, etc.
- * <p>
- * Look for references to the given methods in your IDE for examples.
  */
 public interface InternalBlackboard extends Remote, Serializable {
+
   /**
-   * resets the blackboard
+   * Resets the blackboard.
    */
   void initBlackboard() throws RemoteException;
 
   /**
-   * signals a boolean gate
+   * Signals a boolean gate.
    */
   void signalGate(String gateName) throws RemoteException;
 
   /**
-   * wait for a gate to be signaled
+   * Waits for a gate to be signaled.
    */
   void waitForGate(String gateName, long timeout, TimeUnit units)
       throws RemoteException, TimeoutException, InterruptedException;
 
   /**
-   * clears a gate
+   * Clears a gate.
    */
   void clearGate(String gateName) throws RemoteException;
 
   /**
-   * test to see if a gate has been signeled
+   * Checks to see if a gate has been signaled.
    */
   boolean isGateSignaled(String gateName) throws RemoteException;
 
   /**
-   * put an object into a mailbox slot. The object must be java-serializable
+   * Puts an object into a mailbox slot. The object must be java-serializable.
    */
-  void setMailbox(String boxName, Object value) throws RemoteException;
+  <T> void setMailbox(String boxName, T value) throws RemoteException;
 
   /**
-   * retrieve an object from a mailbox slot
+   * Retrieves an object from a mailbox slot.
    */
   <T> T getMailbox(String boxName) throws RemoteException;
 
   /**
-   * ping the blackboard to make sure it's there
+   * Pings the blackboard to make sure it's there.
    */
   void ping() throws RemoteException;
 
+  Map<String, Boolean> gates() throws RemoteException;
+
+  Map<String, Serializable> mailboxes() throws RemoteException;
+
+  void putGates(Map<String, Boolean> gates) throws RemoteException;
+
+  void putMailboxes(Map<String, Serializable> mailboxes) throws RemoteException;
 }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboardImpl.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboardImpl.java
index bbed22e..e24a5a0 100755
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboardImpl.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/InternalBlackboardImpl.java
@@ -14,6 +14,12 @@
  */
 package org.apache.geode.test.dunit.internal;
 
+import static java.util.Collections.unmodifiableMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+
+import java.io.Serializable;
+import java.net.MalformedURLException;
 import java.rmi.AlreadyBoundException;
 import java.rmi.Naming;
 import java.rmi.NotBoundException;
@@ -24,26 +30,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-
 public class InternalBlackboardImpl extends UnicastRemoteObject implements InternalBlackboard {
-  public static InternalBlackboard blackboard;
 
-  private Map<String, Boolean> gates = new ConcurrentHashMap<>();
-
-  private Map<String, Object> mailboxes = new ConcurrentHashMap();
+  private static InternalBlackboard blackboard;
 
+  private final Map<String, Boolean> gates = new ConcurrentHashMap<>();
+  private final Map<String, Serializable> mailboxes = new ConcurrentHashMap<>();
 
   /**
    * Zero-arg constructor for remote method invocations.
    */
   public InternalBlackboardImpl() throws RemoteException {
-    super();
+    // nothing
   }
 
   /**
    * Creates a singleton event listeners blackboard.
    */
-  public static InternalBlackboard getInstance() {
+  public static synchronized InternalBlackboard getInstance() {
     if (blackboard == null) {
       try {
         initialize();
@@ -56,11 +60,12 @@ public class InternalBlackboardImpl extends UnicastRemoteObject implements Inter
     return blackboard;
   }
 
-  private static synchronized void initialize() throws Exception {
+  private static synchronized void initialize()
+      throws AlreadyBoundException, MalformedURLException, RemoteException {
     if (blackboard == null) {
       System.out.println(
           DUnitLauncher.RMI_PORT_PARAM + "=" + System.getProperty(DUnitLauncher.RMI_PORT_PARAM));
-      int namingPort = Integer.getInteger(DUnitLauncher.RMI_PORT_PARAM).intValue();
+      int namingPort = Integer.getInteger(DUnitLauncher.RMI_PORT_PARAM);
       String name = "//localhost:" + namingPort + "/" + "InternalBlackboard";
       try {
         blackboard = (InternalBlackboard) Naming.lookup(name);
@@ -74,8 +79,8 @@ public class InternalBlackboardImpl extends UnicastRemoteObject implements Inter
 
   @Override
   public void initBlackboard() throws RemoteException {
-    this.gates.clear();
-    this.mailboxes.clear();
+    gates.clear();
+    mailboxes.clear();
   }
 
   @Override
@@ -90,8 +95,8 @@ public class InternalBlackboardImpl extends UnicastRemoteObject implements Inter
 
   @Override
   public void waitForGate(final String gateName, final long timeout, final TimeUnit units)
-      throws RemoteException, TimeoutException, InterruptedException {
-    long giveupTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, units);
+      throws InterruptedException, RemoteException, TimeoutException {
+    long giveupTime = System.currentTimeMillis() + MILLISECONDS.convert(timeout, units);
     while (System.currentTimeMillis() < giveupTime) {
       Boolean gate = gates.get(gateName);
       if (gate != null && gate) {
@@ -105,17 +110,17 @@ public class InternalBlackboardImpl extends UnicastRemoteObject implements Inter
   @Override
   public boolean isGateSignaled(final String gateName) {
     Boolean gate = gates.get(gateName);
-    return (gate != null && gate);
+    return gate != null && gate;
   }
 
   @Override
-  public void setMailbox(String boxName, Object value) {
-    mailboxes.put(boxName, value);
+  public <T> void setMailbox(String boxName, T value) {
+    mailboxes.put(boxName, (Serializable) value);
   }
 
   @Override
-  public Object getMailbox(String boxName) {
-    return mailboxes.get(boxName);
+  public <T> T getMailbox(String boxName) {
+    return uncheckedCast(mailboxes.get(boxName));
   }
 
   @Override
@@ -123,5 +128,23 @@ public class InternalBlackboardImpl extends UnicastRemoteObject implements Inter
     // no-op
   }
 
+  @Override
+  public Map<String, Boolean> gates() {
+    return unmodifiableMap(gates);
+  }
 
+  @Override
+  public Map<String, Serializable> mailboxes() {
+    return unmodifiableMap(mailboxes);
+  }
+
+  @Override
+  public void putGates(Map<String, Boolean> gates) {
+    this.gates.putAll(gates);
+  }
+
+  @Override
+  public void putMailboxes(Map<String, Serializable> mailboxes) {
+    this.mailboxes.putAll(mailboxes);
+  }
 }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedBlackboard.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedBlackboard.java
new file mode 100644
index 0000000..8161b6d
--- /dev/null
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedBlackboard.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.geode.test.dunit.Blackboard;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.InternalBlackboard;
+import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
+
+/**
+ * DistributedBlackboard provides mailboxes and synchronization gateways for distributed tests.
+ *
+ * <p>
+ * Tests may use the blackboard to pass objects and status between JVMs with mailboxes instead of
+ * using static variables in classes. The caveat being that the objects will be serialized using
+ * Java serialization.
+ *
+ * <p>
+ * Gates may be used to synchronize operations between distributed test JVMs. Combined with
+ * Awaitility these can be used to test for conditions being met, actions having happened, etc.
+ *
+ * <p>
+ * Look for references to the given methods in your IDE for examples.
+ */
+@SuppressWarnings({"serial", "unused"})
+public class DistributedBlackboard extends AbstractDistributedRule implements Blackboard {
+
+  private static final AtomicReference<DUnitBlackboard> BLACKBOARD = new AtomicReference<>();
+  private static final AtomicReference<InternalBlackboard> INTERNAL = new AtomicReference<>();
+
+  private final Map<Integer, Map<String, Boolean>> keepGates = new ConcurrentHashMap<>();
+  private final Map<Integer, Map<String, Serializable>> keepMailboxes = new ConcurrentHashMap<>();
+
+  @Override
+  protected void before() {
+    invoker().invokeInEveryVMAndController(() -> invokeBefore());
+  }
+
+  @Override
+  protected void after() throws Throwable {
+    invoker().invokeInEveryVMAndController(() -> invokeAfter());
+  }
+
+  @Override
+  protected void afterCreateVM(VM vm) {
+    vm.invoke(() -> invokeBefore());
+  }
+
+  @Override
+  protected void beforeBounceVM(VM vm) {
+    keepGates.put(vm.getId(), vm.invoke(() -> INTERNAL.get().gates()));
+    keepMailboxes.put(vm.getId(), vm.invoke(() -> INTERNAL.get().mailboxes()));
+  }
+
+  @Override
+  protected void afterBounceVM(VM vm) {
+    Map<String, Boolean> keepGatesForVM = keepGates.remove(vm.getId());
+    Map<String, Serializable> keepMailboxesForVM = keepMailboxes.remove(vm.getId());
+
+    vm.invoke(() -> {
+      invokeBefore();
+      INTERNAL.get().putGates(keepGatesForVM);
+      INTERNAL.get().putMailboxes(keepMailboxesForVM);
+    });
+  }
+
+  private void invokeBefore() {
+    InternalBlackboard internalBlackboard = InternalBlackboardImpl.getInstance();
+    INTERNAL.set(internalBlackboard);
+    BLACKBOARD.set(new DUnitBlackboard(internalBlackboard));
+  }
+
+  private void invokeAfter() {
+    BLACKBOARD.set(null);
+    INTERNAL.set(null);
+  }
+
+  @Override
+  public void initBlackboard() {
+    BLACKBOARD.get().initBlackboard();
+  }
+
+  @Override
+  public void signalGate(String gateName) {
+    BLACKBOARD.get().signalGate(gateName);
+  }
+
+  @Override
+  public void waitForGate(String gateName) throws TimeoutException, InterruptedException {
+    BLACKBOARD.get().waitForGate(gateName);
+  }
+
+  @Override
+  public void waitForGate(String gateName, long timeout, TimeUnit units)
+      throws TimeoutException, InterruptedException {
+    BLACKBOARD.get().waitForGate(gateName, timeout, units);
+  }
+
+  @Override
+  public void clearGate(String gateName) {
+    BLACKBOARD.get().clearGate(gateName);
+  }
+
+  @Override
+  public boolean isGateSignaled(String gateName) {
+    return BLACKBOARD.get().isGateSignaled(gateName);
+  }
+
+  @Override
+  public <T> void setMailbox(String boxName, T value) {
+    BLACKBOARD.get().setMailbox(boxName, value);
+  }
+
+  @Override
+  public <T> T getMailbox(String boxName) {
+    return BLACKBOARD.get().getMailbox(boxName);
+  }
+}


[geode] 03/03: GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch backport-1-13-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0906b08f8f77d5e22d4fd155205129c864a3a85a
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Thu Oct 29 16:38:25 2020 -0700

    GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)
    
    - NioSslEngine.close() proceeds even if readers (or writers) are
      operating on its ByteBuffers, allowing Connection.close() to close
      its socket and proceed.
    
    - NioSslEngine.close() needed a lock only on the output buffer, so
      we split what was a single lock into two. Also instead of using
      synchronized we use a ReentrantLock so we can
      call tryLock() and time out if needed in NioSslEngine.close().
    
    - Since readers/writers may hold locks on these input/output buffers
      when NioSslEngine.close() is called a reference count is maintained
      and the buffers are returned to the pool only when the last user
      is done.
    
    - To manage the locking and reference counting a new AutoCloseable
      ByteBufferSharing interface is introduced with a trivial
      implementation: ByteBufferSharingNoOp and a real implementation:
      ByteBufferSharingImpl.
    
    Co-authored-by: Bill Burcham <bi...@gmail.com>
    Co-authored-by: Darrel Schneider <ds...@pivotal.io>
    Co-authored-by: Ernie Burghardt <bu...@vmware.com>
    (cherry picked from commit 08e9e9673d0ed05555a3d74c6d16e706817cab09)
---
 .../tcp/ConnectionCloseSSLTLSDUnitTest.java        | 238 +++++++++++++
 .../org/apache/geode/internal/tcp/server.keystore  | Bin 0 -> 1256 bytes
 ...LSocketHostNameVerificationIntegrationTest.java |   4 +-
 .../internal/net/SSLSocketIntegrationTest.java     |  57 +--
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../geode/internal/net/ByteBufferSharing.java      |  55 +++
 .../geode/internal/net/ByteBufferSharingImpl.java  | 148 ++++++++
 .../geode/internal/net/ByteBufferSharingNoOp.java  |  52 +++
 .../org/apache/geode/internal/net/NioFilter.java   |  69 ++--
 .../apache/geode/internal/net/NioPlainEngine.java  |  27 +-
 .../apache/geode/internal/net/NioSslEngine.java    | 353 ++++++++++---------
 .../org/apache/geode/internal/tcp/Connection.java  |  34 +-
 .../org/apache/geode/internal/tcp/MsgReader.java   |  15 +-
 .../internal/net/ByteBufferSharingImplTest.java    | 163 +++++++++
 .../geode/internal/net/NioPlainEngineTest.java     |  47 ++-
 .../geode/internal/net/NioSslEngineTest.java       | 392 +++++++++++----------
 16 files changed, 1195 insertions(+), 460 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
new file mode 100644
index 0000000..77fe9bf
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * It would be nice if this test didn't need to use the cache since the test's purpose is to test
+ * that the {@link Connection} class can be closed while readers and writers hold locks on its
+ * internal TLS {@link ByteBuffer}s
+ *
+ * But this test does use the cache (region) because it enabled us to use existing cache messaging
+ * and to use the DistributionMessageObserver (observer) hooks.
+ *
+ * see also ClusterCommunicationsDUnitTest
+ */
+public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
+
+  private static final int SMALL_BUFFER_SIZE = 8000;
+  private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered";
+  private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate";
+  private static final String regionName = "connectionCloseDUnitTestRegion";
+  private static final Logger logger = LogService.getLogger();
+
+  private static Cache cache;
+
+  @Rule
+  public DistributedRule distributedRule =
+      DistributedRule.builder().withVMCount(3).build();
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  private VM locator;
+  private VM sender;
+  private VM receiver;
+
+  @Before
+  public void before() {
+    locator = getVM(0);
+    sender = getVM(1);
+    receiver = getVM(2);
+  }
+
+  @After
+  public void after() {
+    receiver.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
+
+  @Test
+  public void connectionWithHungReaderIsCloseableAndUnhangsReader()
+      throws InterruptedException, TimeoutException {
+
+    blackboard.clearGate(UPDATE_ENTERED_GATE);
+    blackboard.clearGate(SUSPEND_UPDATE_GATE);
+
+    final int locatorPort = createLocator(locator);
+    createCacheAndRegion(sender, locatorPort);
+    createCacheAndRegion(receiver, locatorPort);
+
+    receiver
+        .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)",
+            () -> {
+              final DistributionMessageObserver observer =
+                  new DistributionMessageObserver() {
+
+                    @Override
+                    public void beforeProcessMessage(final ClusterDistributionManager dm,
+                        final DistributionMessage message) {
+                      guardMessageProcessingHook(message, () -> {
+                        try {
+                          blackboard.signalGate(UPDATE_ENTERED_GATE);
+                          blackboard.waitForGate(SUSPEND_UPDATE_GATE);
+                        } catch (TimeoutException | InterruptedException e) {
+                          fail("message observus interruptus");
+                        }
+                        logger.info("BGB: got before process message: " + message);
+                      });
+                    }
+                  };
+              DistributionMessageObserver.setInstance(observer);
+            });
+
+    final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> {
+      final Region<Object, Object> region = cache.getRegion(regionName);
+      // test is going to close the cache while we are waiting for our ack
+      assertThatThrownBy(() -> {
+        region.put("hello", "world");
+      }).isInstanceOf(DistributedSystemDisconnectedException.class);
+    });
+
+    // wait until our message observer is blocked
+    blackboard.waitForGate(UPDATE_ENTERED_GATE);
+
+    // at this point our put() is blocked waiting for a direct ack
+    assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue();
+
+    /*
+     * Now close the cache. The point of calling it is to test that we don't block while trying
+     * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn
+     * closes all the connections (and their sockets.) We want the sockets to close because that'll
+     * cause our hung put() to see a DistributedSystemDisconnectedException.
+     */
+    sender.invoke("", () -> cache.close());
+
+    // wait for put task to complete: with an exception, that is!
+    putInvocation.get();
+
+    // un-stick our message observer
+    blackboard.signalGate(SUSPEND_UPDATE_GATE);
+  }
+
+  private void guardMessageProcessingHook(final DistributionMessage message,
+      final Runnable runnable) {
+    if (message instanceof UpdateMessage) {
+      final UpdateMessage updateMessage = (UpdateMessage) message;
+      if (updateMessage.getRegionPath().equals("/" + regionName)) {
+        runnable.run();
+      }
+    }
+  }
+
+  private int createLocator(VM memberVM) {
+    return memberVM.invoke("create locator", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
+          .getPort();
+    });
+  }
+
+  private void createCacheAndRegion(VM memberVM, int locatorPort) {
+    memberVM.invoke("start cache and create region", () -> {
+      cache = createCache(locatorPort);
+      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    });
+  }
+
+  private Cache createCache(int locatorPort) {
+    // if you need to debug SSL communications use this property:
+    // System.setProperty("javax.net.debug", "all");
+    Properties properties = getDistributedSystemProperties();
+    properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+    return new CacheFactory(properties).create();
+  }
+
+  private Properties getDistributedSystemProperties() {
+    Properties properties = new Properties();
+    properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    properties.setProperty(NAME, "vm" + VM.getCurrentVMNum());
+    properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack
+    properties.setProperty(SOCKET_LEASE_TIME, "10000");
+    properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
+
+    properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator");
+    properties
+        .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore")
+            .getAbsolutePath());
+    properties.setProperty(SSL_TRUSTSTORE,
+        createTempFileFromResource(getClass(), "server.keystore")
+            .getAbsolutePath());
+    properties.setProperty(SSL_PROTOCOLS, "TLSv1.2");
+    properties.setProperty(SSL_KEYSTORE_PASSWORD, "password");
+    properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password");
+    properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true");
+    return properties;
+  }
+
+}
diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore
new file mode 100644
index 0000000..8b5305f
Binary files /dev/null and b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore differ
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index dc7df44..a70f3b1 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -215,7 +215,9 @@ public class SSLSocketHostNameVerificationIntegrationTest {
           final NioSslEngine nioSslEngine = engine;
           engine.close(socket.getChannel());
           assertThatThrownBy(() -> {
-            nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+            try (final ByteBufferSharing unused =
+                nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+            }
           })
               .isInstanceOf(IOException.class);
         }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 19eab4f..add6b9a 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -256,11 +256,13 @@ public class SSLSocketIntegrationTest {
     ByteBuffer buffer = bbos.getContentBuffer();
     System.out.println(
         "client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
-    ByteBuffer wrappedBuffer = engine.wrap(buffer);
-    System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
-        + " and limit is " + wrappedBuffer.limit());
-    int bytesWritten = clientChannel.write(wrappedBuffer);
-    System.out.println("client bytes written is " + bytesWritten);
+    try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) {
+      ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+      System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+          + " and limit is " + wrappedBuffer.limit());
+      int bytesWritten = clientChannel.write(wrappedBuffer);
+      System.out.println("client bytes written is " + bytesWritten);
+    }
   }
 
   private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
@@ -299,7 +301,9 @@ public class SSLSocketIntegrationTest {
           final NioSslEngine nioSslEngine = engine;
           engine.close(socket.getChannel());
           assertThatThrownBy(() -> {
-            nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+            try (final ByteBufferSharing unused =
+                nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+            }
           })
               .isInstanceOf(IOException.class);
         }
@@ -313,24 +317,35 @@ public class SSLSocketIntegrationTest {
   private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
       throws IOException {
 
-    ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
-    // if we already have unencrypted data skip unwrapping
-    if (unwrapped.position() == 0) {
-      int bytesRead;
-      // if we already have encrypted data skip reading from the socket
-      if (buffer.position() == 0) {
-        bytesRead = socket.getChannel().read(buffer);
-        buffer.flip();
+    try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) {
+      final ByteBuffer unwrapped = sharedBuffer.getBuffer();
+      // if we already have unencrypted data skip unwrapping
+      if (unwrapped.position() == 0) {
+        int bytesRead;
+        // if we already have encrypted data skip reading from the socket
+        if (buffer.position() == 0) {
+          bytesRead = socket.getChannel().read(buffer);
+          buffer.flip();
+        } else {
+          bytesRead = buffer.remaining();
+        }
+        System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+            + buffer.position() + " and limit is " + buffer.limit());
+        try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) {
+          final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer();
+
+          unwrapped2.flip();
+          System.out.println("server unwrapped buffer position is " + unwrapped2.position()
+              + " and limit is " + unwrapped2.limit());
+          finishReadMessageFromNIOSSLClient(unwrapped2);
+        }
       } else {
-        bytesRead = buffer.remaining();
+        finishReadMessageFromNIOSSLClient(unwrapped);
       }
-      System.out.println("server bytes read is " + bytesRead + ": buffer position is "
-          + buffer.position() + " and limit is " + buffer.limit());
-      unwrapped = engine.unwrap(buffer);
-      unwrapped.flip();
-      System.out.println("server unwrapped buffer position is " + unwrapped.position()
-          + " and limit is " + unwrapped.limit());
     }
+  }
+
+  private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
     DataInputStream dis = new DataInputStream(bbis);
     String welcome = dis.readUTF();
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index a46d5fc..33f43c3 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,3 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
new file mode 100644
index 0000000..cdfa897
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+
+/**
+ * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for
+ * reading and modification) within the scope of that try block.
+ *
+ * Releases managed ByteBuffer back to pool after last reference is dropped.
+ */
+public interface ByteBufferSharing extends AutoCloseable {
+
+  /**
+   * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was
+   * acquired. Retain the reference only within the scope of that try-with-resources.
+   *
+   * @return the buffer: manipulable only within the scope of the try-with-resources
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer getBuffer() throws IOException;
+
+  /**
+   * Expand the buffer if needed. This may return a different object so be sure to pay attention to
+   * the return value if you need access to the potentially- expanded buffer.
+   *
+   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
+   *
+   * @return the same buffer or a different (bigger) buffer
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
+
+  /**
+   * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
+   */
+  @Override
+  void close();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
new file mode 100644
index 0000000..e9a941e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.net.BufferPool.BufferType;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ */
+class ByteBufferSharingImpl implements ByteBufferSharing {
+
+  static class OpenAttemptTimedOut extends Exception {
+  }
+
+  private final Lock lock;
+  private final AtomicBoolean isClosed;
+  // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+  private ByteBuffer buffer;
+  private final BufferType bufferType;
+  private final AtomicInteger counter;
+  private final BufferPool bufferPool;
+
+  /**
+   * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
+   *
+   * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
+   * to an external object or is returned to an external caller.)
+   *
+   * This constructor acquires no lock. The reference count will be 1 after this constructor
+   * completes.
+   */
+  ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
+      final BufferPool bufferPool) {
+    this.buffer = buffer;
+    this.bufferType = bufferType;
+    this.bufferPool = bufferPool;
+    lock = new ReentrantLock();
+    counter = new AtomicInteger(1);
+    isClosed = new AtomicBoolean(false);
+  }
+
+  /**
+   * The destructor. Called by the resource owner to undo the work of the constructor.
+   */
+  void destruct() {
+    if (isClosed.compareAndSet(false, true)) {
+      dropReference();
+    }
+  }
+
+  /**
+   * This method is for use only by the owner of the shared resource. It's used for handing out
+   * references to the shared resource. So it does reference counting and also acquires a lock.
+   *
+   * Resource owners call this method as the last thing before returning a reference to the caller.
+   * That caller binds that reference to a variable in a try-with-resources statement and relies on
+   * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
+   */
+  ByteBufferSharing open() {
+    lock.lock();
+    addReference();
+    return this;
+  }
+
+  /**
+   * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
+   */
+  ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut {
+    try {
+      if (!lock.tryLock(time, unit)) {
+        throw new OpenAttemptTimedOut();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new OpenAttemptTimedOut();
+    }
+    addReference();
+    return this;
+  }
+
+  @Override
+  public ByteBuffer getBuffer() throws IOException {
+    if (isClosed.get()) {
+      throw new IOException("NioSslEngine has been closed");
+    } else {
+      return buffer;
+    }
+  }
+
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+  }
+
+  @Override
+  public void close() {
+    /*
+     * We are counting on our ReentrantLock throwing an exception if the current thread
+     * does not hold the lock. In that case dropReference() will not be called. This
+     * prevents ill-behaved clients (clients that call close() too many times) from
+     * corrupting our reference count.
+     */
+    lock.unlock();
+    dropReference();
+  }
+
+  private int addReference() {
+    return counter.incrementAndGet();
+  }
+
+  private int dropReference() {
+    final int usages = counter.decrementAndGet();
+    if (usages == 0) {
+      bufferPool.releaseBuffer(bufferType, buffer);
+    }
+    return usages;
+  }
+
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    buffer = newBufferForTesting;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
new file mode 100644
index 0000000..bd707e3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ *
+ * This implementation is a "no-op". It performs no actual locking and no reference counting. It's
+ * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
+ * needs no reference counting on buffers, nor any synchronization around access to buffers.
+ *
+ * See also {@link ByteBufferSharingImpl}
+ */
+class ByteBufferSharingNoOp implements ByteBufferSharing {
+
+  private final ByteBuffer buffer;
+
+  ByteBufferSharingNoOp(final ByteBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public ByteBuffer getBuffer() {
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 9c437ad..eb53f0e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -19,47 +19,53 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
 /**
- * Prior to transmitting a buffer or processing a received buffer
- * a NioFilter should be called to wrap (transmit) or unwrap (received)
- * the buffer in case SSL is being used.<br>
- * Implementations of this class may not be thread-safe in regard to
- * the buffers their methods return. These may be internal state that,
- * if used concurrently by multiple threads could cause corruption.
- * Appropriate external synchronization must be used in order to provide
- * thread-safety. Do this by invoking getSynchObject() and synchronizing on
- * the returned object while using the buffer.
+ * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to
+ * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br>
+ * Implementations of
+ * this class may not be thread-safe in regard to the buffers their methods return. These may be
+ * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate
+ * external synchronization must be used in order to provide thread-safety. Do this by invoking
+ * getSynchObject() and synchronizing on the returned object while using the buffer.
  */
 public interface NioFilter {
 
   /**
    * wrap bytes for transmission to another process
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer wrap(ByteBuffer buffer) throws IOException;
+  ByteBufferSharing wrap(ByteBuffer buffer) throws IOException;
 
   /**
-   * unwrap bytes received from another process. The unwrapped
-   * buffer should be flipped before reading. When done reading invoke
-   * doneReading() to reset for future read ops
+   * unwrap bytes received from another process. The unwrapped buffer should be flipped before
+   * reading. When done reading invoke doneReading() to reset for future read ops
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
+  ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException;
 
   /**
-   * ensure that the wrapped buffer has enough room to read the given amount of data.
-   * This must be invoked before readAtLeast. A new buffer may be returned by this method.
+   * ensure that the wrapped buffer has enough room to read the given amount of data. This must be
+   * invoked before readAtLeast. A new buffer may be returned by this method.
    */
   ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
       BufferPool.BufferType bufferType);
 
   /**
-   * read at least the indicated amount of bytes from the given
-   * socket. The buffer position will be ready for reading
-   * the data when this method returns. Note: you must invoke ensureWrappedCapacity
-   * with the given amount prior to each invocation of this method.
+   * read at least the indicated amount of bytes from the given socket. The buffer position will be
+   * ready for reading the data when this method returns. Note: you must invoke
+   * ensureWrappedCapacity with the given amount prior to each invocation of this method.
    * <br>
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
-   * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+   * unwrappedBuffer
+   * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
       throws IOException;
 
   /**
@@ -81,28 +87,19 @@ public interface NioFilter {
     }
   }
 
-  default boolean isClosed() {
-    return false;
-  }
-
   /**
    * invoke this method when you are done using the NioFilter
-   *
    */
   default void close(SocketChannel socketChannel) {
     // nothing by default
   }
 
   /**
-   * returns the unwrapped byte buffer associated with the given wrapped buffer.
+   * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists.
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
+  ByteBufferSharing getUnwrappedBuffer();
 
-  /**
-   * returns an object to be used in synchronizing on the use of buffers returned by
-   * a NioFilter.
-   */
-  default Object getSynchObject() {
-    return this;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 3ebce38..8b5df96 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
+import org.apache.geode.annotations.internal.MakeImmutable;
 import org.apache.geode.internal.Assert;
 
 /**
@@ -27,6 +28,12 @@ import org.apache.geode.internal.Assert;
  * secure communications.
  */
 public class NioPlainEngine implements NioFilter {
+
+  // this variable requires the MakeImmutable annotation but the buffer is empty and
+  // not really modifiable
+  @MakeImmutable
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
   private final BufferPool bufferPool;
 
   int lastReadPosition;
@@ -38,14 +45,14 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer wrap(ByteBuffer buffer) {
-    return buffer;
+  public ByteBufferSharing wrap(ByteBuffer buffer) {
+    return shareBuffer(buffer);
   }
 
   @Override
-  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+  public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) {
     wrappedBuffer.position(wrappedBuffer.limit());
-    return wrappedBuffer;
+    return shareBuffer(wrappedBuffer);
   }
 
   @Override
@@ -82,7 +89,7 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+  public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
       throws IOException {
     ByteBuffer buffer = wrappedBuffer;
 
@@ -108,7 +115,7 @@ public class NioPlainEngine implements NioFilter {
     buffer.position(lastProcessedPosition);
     lastProcessedPosition += bytes;
 
-    return buffer;
+    return shareBuffer(buffer);
   }
 
   public void doneReading(ByteBuffer unwrappedBuffer) {
@@ -121,8 +128,12 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
-    return wrappedBuffer;
+  public ByteBufferSharing getUnwrappedBuffer() {
+    return shareBuffer(EMPTY_BUFFER);
+  }
+
+  private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) {
+    return new ByteBufferSharingNoOp(wrappedBuffer);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 6f32501..7e642ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,24 +40,19 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
-import org.apache.geode.annotations.internal.MakeImmutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
+import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 
 /**
- * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
- * safe. Its use should be confined to one thread or should be protected by external
- * synchronization.
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe.
+ * Its use should be confined to one thread or should be protected by external synchronization.
  */
 public class NioSslEngine implements NioFilter {
   private static final Logger logger = LogService.getLogger();
 
-  // this variable requires the MakeImmutable annotation but the buffer is empty and
-  // not really modifiable
-  @MakeImmutable
-  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
   private final BufferPool bufferPool;
 
   private boolean closed;
@@ -65,23 +60,28 @@ public class NioSslEngine implements NioFilter {
   SSLEngine engine;
 
   /**
-   * myNetData holds bytes wrapped by the SSLEngine
+   * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  ByteBuffer myNetData;
+  private final ByteBufferSharingImpl outputSharing;
 
   /**
-   * peerAppData holds the last unwrapped data from a peer
+   * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  ByteBuffer peerAppData;
+  private final ByteBufferSharingImpl inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
     int appBufferSize = session.getApplicationBufferSize();
     int packetBufferSize = engine.getSession().getPacketBufferSize();
+    closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
-    this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
+    outputSharing =
+        new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
+            TRACKED_SENDER, bufferPool);
+    inputSharing =
+        new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
+            TRACKED_RECEIVER, bufferPool);
   }
 
   /**
@@ -135,57 +135,65 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          // Receive handshaking data from peer
-          int dataRead = socketChannel.read(handshakeBuffer);
-
-          // Process incoming handshaking data
-          handshakeBuffer.flip();
-          engineResult = engine.unwrap(handshakeBuffer, peerAppData);
-          handshakeBuffer.compact();
-          status = engineResult.getHandshakeStatus();
-
-          // if we're not finished, there's nothing to process and no data was read let's hang out
-          // for a little
-          if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
-            Thread.sleep(10);
-          }
+          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+            final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+            // Receive handshaking data from peer
+            int dataRead = socketChannel.read(handshakeBuffer);
+
+            // Process incoming handshaking data
+            handshakeBuffer.flip();
+
+
+            engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+            handshakeBuffer.compact();
+            status = engineResult.getHandshakeStatus();
+
+            // if we're not finished, there's nothing to process and no data was read let's hang out
+            // for a little
+            if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
+              Thread.sleep(10);
+            }
 
-          if (engineResult.getStatus() == BUFFER_OVERFLOW) {
-            peerAppData =
-                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
+            if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+              inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2);
+            }
+            break;
           }
-          break;
 
         case NEED_WRAP:
-          // Empty the local network packet buffer.
-          myNetData.clear();
-
-          // Generate handshaking data
-          engineResult = engine.wrap(myAppData, myNetData);
-          status = engineResult.getHandshakeStatus();
-
-          // Check status
-          switch (engineResult.getStatus()) {
-            case BUFFER_OVERFLOW:
-              myNetData =
-                  expandWriteBuffer(TRACKED_SENDER, myNetData,
-                      myNetData.capacity() * 2);
-              break;
-            case OK:
-              myNetData.flip();
-              // Send the handshaking data to peer
-              while (myNetData.hasRemaining()) {
-                socketChannel.write(myNetData);
-              }
-              break;
-            case CLOSED:
-              break;
-            default:
-              logger.info("handshake terminated with illegal state due to {}", status);
-              throw new IllegalStateException(
-                  "Unknown SSLEngineResult status: " + engineResult.getStatus());
+          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+            final ByteBuffer myNetData = outputSharing.getBuffer();
+
+            // Empty the local network packet buffer.
+            myNetData.clear();
+
+            // Generate handshaking data
+            engineResult = engine.wrap(myAppData, myNetData);
+            status = engineResult.getHandshakeStatus();
+
+            // Check status
+            switch (engineResult.getStatus()) {
+              case BUFFER_OVERFLOW:
+                // no need to assign return value because we will never reference it
+                outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2);
+                break;
+              case OK:
+                myNetData.flip();
+                // Send the handshaking data to peer
+                while (myNetData.hasRemaining()) {
+                  socketChannel.write(myNetData);
+                }
+                break;
+              case CLOSED:
+                break;
+              default:
+                logger.info("handshake terminated with illegal state due to {}", status);
+                throw new IllegalStateException(
+                    "Unknown SSLEngineResult status: " + engineResult.getStatus());
+            }
+            break;
           }
-          break;
         case NEED_TASK:
           // Handle blocking tasks
           handleBlockingTasks();
@@ -213,17 +221,6 @@ public class NioSslEngine implements NioFilter {
     return true;
   }
 
-  ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
-      int desiredCapacity) {
-    return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
-  }
-
-  synchronized void checkClosed() throws IOException {
-    if (closed) {
-      throw new IOException("NioSslEngine has been closed");
-    }
-  }
-
   void handleBlockingTasks() {
     Runnable task;
     while ((task = engine.getDelegatedTask()) != null) {
@@ -233,72 +230,77 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException {
-    checkClosed();
+  public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
 
-    myNetData.clear();
+      ByteBuffer myNetData = outputSharing.getBuffer();
 
-    while (appData.hasRemaining()) {
-      // ensure we have lots of capacity since encrypted data might
-      // be larger than the app data
-      int remaining = myNetData.capacity() - myNetData.position();
+      myNetData.clear();
 
-      if (remaining < (appData.remaining() * 2)) {
-        int newCapacity = expandedCapacity(appData, myNetData);
-        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
-      }
+      while (appData.hasRemaining()) {
+        // ensure we have lots of capacity since encrypted data might
+        // be larger than the app data
+        int remaining = myNetData.capacity() - myNetData.position();
 
-      SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+        if (remaining < (appData.remaining() * 2)) {
+          int newCapacity = expandedCapacity(appData, myNetData);
+          myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+        }
 
-      if (wrapResult.getHandshakeStatus() == NEED_TASK) {
-        handleBlockingTasks();
-      }
+        SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
 
-      if (wrapResult.getStatus() != OK) {
-        throw new SSLException("Error encrypting data: " + wrapResult);
+        if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+          handleBlockingTasks();
+        }
+
+        if (wrapResult.getStatus() != OK) {
+          throw new SSLException("Error encrypting data: " + wrapResult);
+        }
       }
-    }
 
-    myNetData.flip();
+      myNetData.flip();
 
-    return myNetData;
+      return shareOutputBuffer();
+    }
   }
 
   @Override
-  public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
-    checkClosed();
-
-    // note that we do not clear peerAppData as it may hold a partial
-    // message. TcpConduit, for instance, uses message chunking to
-    // transmit large payloads and we may have read a partial chunk
-    // during the previous unwrap
-
-    peerAppData.limit(peerAppData.capacity());
-    while (wrappedBuffer.hasRemaining()) {
-      SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
-      switch (unwrapResult.getStatus()) {
-        case BUFFER_OVERFLOW:
-          // buffer overflow expand and try again - double the available decryption space
-          int newCapacity =
-              (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
-          newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
-          peerAppData =
-              bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity);
-          peerAppData.limit(peerAppData.capacity());
-          break;
-        case BUFFER_UNDERFLOW:
-          // partial data - need to read more. When this happens the SSLEngine will not have
-          // changed the buffer position
-          wrappedBuffer.compact();
-          return peerAppData;
-        case OK:
-          break;
-        default:
-          throw new SSLException("Error decrypting data: " + unwrapResult);
+  public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+
+      ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      // note that we do not clear peerAppData as it may hold a partial
+      // message. TcpConduit, for instance, uses message chunking to
+      // transmit large payloads and we may have read a partial chunk
+      // during the previous unwrap
+
+      peerAppData.limit(peerAppData.capacity());
+      while (wrappedBuffer.hasRemaining()) {
+        SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+        switch (unwrapResult.getStatus()) {
+          case BUFFER_OVERFLOW:
+            // buffer overflow expand and try again - double the available decryption space
+            int newCapacity =
+                (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
+            newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
+            peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity);
+            peerAppData.limit(peerAppData.capacity());
+            break;
+          case BUFFER_UNDERFLOW:
+            // partial data - need to read more. When this happens the SSLEngine will not have
+            // changed the buffer position
+            wrappedBuffer.compact();
+            return shareInputBuffer();
+          case OK:
+            break;
+          default:
+            throw new SSLException("Error decrypting data: " + unwrapResult);
+        }
       }
+      wrappedBuffer.clear();
+      return shareInputBuffer();
     }
-    wrappedBuffer.clear();
-    return peerAppData;
   }
 
   @Override
@@ -315,50 +317,45 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
+  public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    if (peerAppData.capacity() > bytes) {
-      // we already have a buffer that's big enough
-      if (peerAppData.capacity() - peerAppData.position() < bytes) {
-        peerAppData.compact();
-        peerAppData.flip();
-      }
-    }
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
-    while (peerAppData.remaining() < bytes) {
-      wrappedBuffer.limit(wrappedBuffer.capacity());
-      int amountRead = channel.read(wrappedBuffer);
-      if (amountRead < 0) {
-        throw new EOFException();
+      ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      if (peerAppData.capacity() > bytes) {
+        // we already have a buffer that's big enough
+        if (peerAppData.capacity() - peerAppData.position() < bytes) {
+          peerAppData.compact();
+          peerAppData.flip();
+        }
       }
-      if (amountRead > 0) {
-        wrappedBuffer.flip();
-        // prep the decoded buffer for writing
-        peerAppData.compact();
-        peerAppData = unwrap(wrappedBuffer);
-        // done writing to the decoded buffer - prep it for reading again
-        peerAppData.flip();
+
+      while (peerAppData.remaining() < bytes) {
+        wrappedBuffer.limit(wrappedBuffer.capacity());
+        int amountRead = channel.read(wrappedBuffer);
+        if (amountRead < 0) {
+          throw new EOFException();
+        }
+        if (amountRead > 0) {
+          wrappedBuffer.flip();
+          // prep the decoded buffer for writing
+          peerAppData.compact();
+          try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) {
+            // done writing to the decoded buffer - prep it for reading again
+            final ByteBuffer peerAppDataNew = inputSharing2.getBuffer();
+            peerAppDataNew.flip();
+            peerAppData = peerAppDataNew; // loop needs new reference!
+          }
+        }
       }
+      return shareInputBuffer();
     }
-    return peerAppData;
   }
 
   @Override
-  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
-    return peerAppData;
-  }
-
-  /**
-   * ensures that the unwrapped buffer associated with the given wrapped buffer has
-   * sufficient capacity for the given amount of bytes. This may compact the
-   * buffer or it may return a new buffer.
-   */
-  public ByteBuffer ensureUnwrappedCapacity(int amount) {
-    // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
-    // do not use them for I/O operations
-    peerAppData =
-        bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
-    return peerAppData;
+  public ByteBufferSharing getUnwrappedBuffer() {
+    return shareInputBuffer();
   }
 
   @Override
@@ -369,16 +366,14 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public synchronized boolean isClosed() {
-    return closed;
-  }
-
-  @Override
   public synchronized void close(SocketChannel socketChannel) {
     if (closed) {
       return;
     }
-    try {
+    closed = true;
+    inputSharing.destruct();
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
+      final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
         ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
@@ -405,14 +400,13 @@ public class NioSslEngine implements NioFilter {
       // we can't send a close message if the channel is closed
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
+    } catch (final OpenAttemptTimedOut _unused) {
+      logger.info(String.format("Couldn't get output lock in time, eliding TLS close message"));
+      if (!engine.isOutboundDone()) {
+        engine.closeOutbound();
+      }
     } finally {
-      ByteBuffer netData = myNetData;
-      ByteBuffer appData = peerAppData;
-      myNetData = null;
-      peerAppData = EMPTY_BUFFER;
-      bufferPool.releaseBuffer(TRACKED_SENDER, netData);
-      bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
-      this.closed = true;
+      outputSharing.destruct();
     }
   }
 
@@ -421,4 +415,17 @@ public class NioSslEngine implements NioFilter {
         targetBuffer.capacity() * 2);
   }
 
+  @VisibleForTesting
+  public ByteBufferSharing shareOutputBuffer() {
+    return outputSharing.open();
+  }
+
+  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+      throws OpenAttemptTimedOut {
+    return outputSharing.open(time, unit);
+  }
+
+  public ByteBufferSharing shareInputBuffer() {
+    return inputSharing.open();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 9292727..c6619d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -78,6 +78,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -800,11 +801,12 @@ public class Connection implements Runnable {
   @VisibleForTesting
   void clearSSLInputBuffer() {
     if (getConduit().useSSL() && ioFilter != null) {
-      synchronized (ioFilter.getSynchObject()) {
-        if (!ioFilter.isClosed()) {
-          // clear out any remaining handshake bytes
-          ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
-          buffer.position(0).limit(0);
+      try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) {
+        // clear out any remaining handshake bytes
+        try {
+          sharedBuffer.getBuffer().position(0).limit(0);
+        } catch (IOException e) {
+          // means the NioFilter was already closed
         }
       }
     }
@@ -2450,8 +2452,9 @@ public class Connection implements Runnable {
         long queueTimeoutTarget = now + asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
-          synchronized (ioFilter.getSynchObject()) {
-            ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+          try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+            final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
             int waitTime = 1;
             do {
               owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2604,9 +2607,9 @@ public class Connection implements Runnable {
           }
           // fall through
         }
-        // synchronize on the ioFilter while using its network buffer
-        synchronized (ioFilter.getSynchObject()) {
-          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+        try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+          final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
           while (wrappedBuffer.remaining() > 0) {
             int amtWritten = 0;
             long start = stats.startSocketWrite(true);
@@ -2658,10 +2661,12 @@ public class Connection implements Runnable {
     final Version version = getRemoteVersion();
     try {
       msgReader = new MsgReader(this, ioFilter, version);
+
       ReplyMessage msg;
       int len;
 
-      synchronized (ioFilter.getSynchObject()) {
+      // (we have to lock here to protect between reading header and message body)
+      try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) {
         Header header = msgReader.readHeader();
 
         if (header.getMessageType() == NORMAL_MSG_TYPE) {
@@ -2678,7 +2683,7 @@ public class Connection implements Runnable {
           releaseMsgDestreamer(header.getMessageId(), destreamer);
           len = destreamer.size();
         }
-      } // sync
+      }
       // I'd really just like to call dispatchMessage here. However,
       // that call goes through a bunch of checks that knock about
       // 10% of the performance. Since this direct-ack stuff is all
@@ -2745,8 +2750,9 @@ public class Connection implements Runnable {
   private void processInputBuffer() throws ConnectionException, IOException {
     inputBuffer.flip();
 
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+
       peerDataBuffer.flip();
 
       boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 396ece2..503e48b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -26,6 +26,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -54,8 +55,8 @@ public class MsgReader {
   }
 
   Header readHeader() throws IOException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) {
+      ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
 
       Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
 
@@ -89,8 +90,8 @@ public class MsgReader {
    */
   DistributionMessage readMessage(Header header)
       throws IOException, ClassNotFoundException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+      ByteBuffer nioInputBuffer = sharedBuffer.getBuffer();
       Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
       this.getStats().incMessagesBeingReceived(true, header.messageLength);
       long startSer = this.getStats().startMsgDeserialization();
@@ -112,8 +113,8 @@ public class MsgReader {
 
   void readChunk(Header header, MsgDestreamer md)
       throws IOException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+      ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
       this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
       md.addChunk(unwrappedBuffer, header.messageLength);
       // show that the bytes have been consumed by adjusting the buffer's position
@@ -123,7 +124,7 @@ public class MsgReader {
 
 
 
-  private ByteBuffer readAtLeast(int bytes) throws IOException {
+  private ByteBufferSharing readAtLeast(int bytes) throws IOException {
     peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
         BufferPool.BufferType.TRACKED_RECEIVER);
     return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
new file mode 100644
index 0000000..bb5a75f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ByteBufferSharingImplTest {
+
+  private ByteBufferSharingImpl sharing;
+  private BufferPool poolMock;
+  private CountDownLatch clientHasOpenedResource;
+  private CountDownLatch clientMayComplete;
+
+  @Before
+  public void before() {
+    poolMock = mock(BufferPool.class);
+    sharing =
+        new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
+            poolMock);
+    clientHasOpenedResource = new CountDownLatch(1);
+    clientMayComplete = new CountDownLatch(1);
+  }
+
+  @Test
+  public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+    resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
+      try (final ByteBufferSharing _unused = sharing.open()) {
+      }
+    });
+  }
+
+  @Test
+  public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+    resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
+      final ByteBufferSharing sharing2 = sharing.open();
+      sharing2.close();
+      verify(poolMock, times(0)).releaseBuffer(any(), any());
+      assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      verify(poolMock, times(0)).releaseBuffer(any(), any());
+    });
+  }
+
+  @Test
+  public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
+    clientIsLastReferenceHolder("client with balanced close calls", () -> {
+      try (final ByteBufferSharing _unused = sharing.open()) {
+        clientHasOpenedResource.countDown();
+        blockClient();
+      }
+    });
+  }
+
+  @Test
+  public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
+    clientIsLastReferenceHolder("client with extra close calls", () -> {
+      final ByteBufferSharing sharing2 = sharing.open();
+      clientHasOpenedResource.countDown();
+      blockClient();
+      sharing2.close();
+      verify(poolMock, times(1)).releaseBuffer(any(), any());
+      assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      System.out.println("here");
+    });
+  }
+
+  @Test
+  public void extraCloseDoesNotPrematurelyReturnBufferToPool() {
+    final ByteBufferSharing sharing2 = sharing.open();
+    sharing2.close();
+    assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+    sharing.destruct();
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  @Test
+  public void extraCloseDoesNotDecrementRefCount() {
+    final ByteBufferSharing sharing2 = sharing.open();
+    sharing2.close();
+    assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+    final ByteBufferSharing sharing3 = this.sharing.open();
+    sharing.destruct();
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+  }
+
+  private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client)
+      throws InterruptedException {
+    /*
+     * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+     */
+
+    /*
+     * clientThread thread is playing the role of the client (of the resource owner)
+     */
+    final Thread clientThread = new Thread(client, name);
+    clientThread.start();
+    clientThread.join();
+
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+    sharing.destruct();
+
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  private void clientIsLastReferenceHolder(final String name, final Runnable client)
+      throws InterruptedException {
+    /*
+     * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+     */
+
+    /*
+     * clientThread thread is playing the role of the client (of the resource owner)
+     */
+    final Thread clientThread = new Thread(client, name);
+    clientThread.start();
+
+    clientHasOpenedResource.await();
+
+    sharing.destruct();
+
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+    clientMayComplete.countDown(); // let client finish
+
+    clientThread.join();
+
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  private void blockClient() {
+    try {
+      clientMayComplete.await();
+    } catch (InterruptedException e) {
+      fail("test client thread interrupted: " + e);
+    }
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 3d394fb..7ab838c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -50,7 +50,8 @@ public class NioPlainEngineTest {
   public void unwrap() {
     ByteBuffer buffer = ByteBuffer.allocate(100);
     buffer.position(0).limit(buffer.capacity());
-    nioEngine.unwrap(buffer);
+    try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) {
+    }
     assertThat(buffer.position()).isEqualTo(buffer.limit());
   }
 
@@ -116,23 +117,29 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(amountToRead);
-    assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
-    assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
-
-    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(5)).read(any(ByteBuffer.class));
-    // at end of last readAtLeast data
-    assertThat(data.position()).isEqualTo(amountToRead);
-    // we read amountToRead bytes
-    assertThat(data.limit()).isEqualTo(amountToRead * 2);
-    // we did 2 more reads from the network
-    assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
-    // the next read will start at the end of consumed data
-    assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+    try (final ByteBufferSharing sharedBuffer =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit()).isEqualTo(amountToRead);
+      assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
+      assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
+    }
+
+    try (final ByteBufferSharing sharedBuffer =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      final ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(5)).read(any(ByteBuffer.class));
+      // at end of last readAtLeast data
+      assertThat(data.position()).isEqualTo(amountToRead);
+      // we read amountToRead bytes
+      assertThat(data.limit()).isEqualTo(amountToRead * 2);
+      // we did 2 more reads from the network
+      assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
+      // the next read will start at the end of consumed data
+      assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+    }
 
   }
 
@@ -147,7 +154,9 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+    try (final ByteBufferSharing unused =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+    }
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ee4aaa3..e9b01cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -69,6 +69,7 @@ public class NioSslEngineTest {
   private DMStats mockStats;
   private NioSslEngine nioSslEngine;
   private NioSslEngine spyNioSslEngine;
+  private BufferPool spyBufferPool;
 
   @Before
   public void setUp() throws Exception {
@@ -81,13 +82,17 @@ public class NioSslEngineTest {
 
     mockStats = mock(DMStats.class);
 
-    nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
+    final BufferPool bufferPool = new BufferPool(mockStats);
+    spyBufferPool = spy(bufferPool);
+    nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool);
     spyNioSslEngine = spy(nioSslEngine);
   }
 
   @Test
-  public void engineUsesDirectBuffers() {
-    assertThat(nioSslEngine.myNetData.isDirect()).isTrue();
+  public void engineUsesDirectBuffers() throws IOException {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+      assertThat(outputSharing.getBuffer().isDirect()).isTrue();
+    }
   }
 
   @Test
@@ -119,7 +124,7 @@ public class NioSslEngineTest {
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
-    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+    verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
         any(ByteBuffer.class), any(Integer.class));
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
     verify(mockChannel, times(3)).read(any(ByteBuffer.class));
@@ -183,148 +188,148 @@ public class NioSslEngineTest {
         .hasMessageContaining("SSL Handshake terminated with status");
   }
 
-
-  @Test
-  public void checkClosed() throws Exception {
-    nioSslEngine.checkClosed();
-  }
-
-  @Test(expected = IOException.class)
-  public void checkClosedThrows() throws Exception {
-    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
-        new SSLEngineResult(CLOSED, FINISHED, 0, 100));
-    nioSslEngine.close(mock(SocketChannel.class));
-    nioSslEngine.checkClosed();
-  }
-
-  @Test
-  public void synchObjectIsSelf() {
-    // for thread-safety the synchronization object given to outside entities
-    // must be the the engine itself. This allows external manipulation or
-    // use of the engine's buffers to be protected in the same way as its synchronized
-    // methods
-    assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
-  }
-
   @Test
   public void wrap() throws Exception {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
-    byte[] appBytes = new byte[appData.capacity()];
-    Arrays.fill(appBytes, (byte) 0x1F);
-    appData.put(appBytes);
-    appData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(
-        new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
-    spyNioSslEngine.engine = testEngine;
-
-    ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
-
-    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class));
-    appData.flip();
-    assertThat(wrappedBuffer).isEqualTo(appData);
-    verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer appData =
+          ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+      byte[] appBytes = new byte[appData.capacity()];
+      Arrays.fill(appBytes, (byte) 0x1F);
+      appData.put(appBytes);
+      appData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(
+          new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
+      spyNioSslEngine.engine = testEngine;
+
+      try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) {
+        ByteBuffer wrappedBuffer = outputSharing2.getBuffer();
+
+        verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
+            any(ByteBuffer.class), any(Integer.class));
+        appData.flip();
+        assertThat(wrappedBuffer).isEqualTo(appData);
+      }
+      verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+    }
   }
 
   @Test
-  public void wrapFails() {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
-    byte[] appBytes = new byte[appData.capacity()];
-    Arrays.fill(appBytes, (byte) 0x1F);
-    appData.put(appBytes);
-    appData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(
-        new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
-    spyNioSslEngine.engine = testEngine;
-
-    assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
-        .hasMessageContaining("Error encrypting data");
+  public void wrapFails() throws IOException {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer appData =
+          ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+      byte[] appBytes = new byte[appData.capacity()];
+      Arrays.fill(appBytes, (byte) 0x1F);
+      appData.put(appBytes);
+      appData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(
+          new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
+      spyNioSslEngine.engine = testEngine;
+
+      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+          .hasMessageContaining("Error encrypting data");
+    }
   }
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    // make the application data too big to fit into the engine's encryption buffer
-    int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity();
-    int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
-    nioSslEngine.peerAppData.position(originalPeerAppDataPosition);
-    ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
-    byte[] netBytes = new byte[wrappedData.capacity()];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    spyNioSslEngine.engine = testEngine;
-
-    testEngine.addReturnResult(
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
-        new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
-
-    int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
-    expectedCapacity =
-        2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
-    expectedCapacity =
-        2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
-    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
-    unwrappedBuffer.flip();
-    assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      int originalPeerAppDataCapacity = peerAppData.capacity();
+      int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
+      peerAppData.position(originalPeerAppDataPosition);
+      ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
+      byte[] netBytes = new byte[wrappedData.capacity()];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      spyNioSslEngine.engine = testEngine;
+
+      testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
+          new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
+
+      int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
+      expectedCapacity =
+          2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+      expectedCapacity =
+          2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+      try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+        ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+        unwrappedBuffer.flip();
+        assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+      }
+    }
   }
 
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
-    byte[] netBytes = new byte[wrappedData.capacity() / 2];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
-    spyNioSslEngine.engine = testEngine;
-
-    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
-    unwrappedBuffer.flip();
-    assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
-    assertThat(wrappedData.position()).isEqualTo(netBytes.length);
-  }
-
-  @Test
-  public void unwrapWithDecryptionError() {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
-    byte[] netBytes = new byte[wrappedData.capacity() / 2];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
-    spyNioSslEngine.engine = testEngine;
-
-    assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class)
-        .hasMessageContaining("Error decrypting data");
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      ByteBuffer wrappedData =
+          ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+      byte[] netBytes = new byte[wrappedData.capacity() / 2];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
+      spyNioSslEngine.engine = testEngine;
+
+      try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+        ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+        unwrappedBuffer.flip();
+        assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
+      }
+      assertThat(wrappedData.position()).isEqualTo(netBytes.length);
+    }
   }
 
   @Test
-  public void ensureUnwrappedCapacity() {
-    ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
-    int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
-    ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity);
-    assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
+  public void unwrapWithDecryptionError() throws IOException {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer wrappedData =
+          ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+      byte[] netBytes = new byte[wrappedData.capacity() / 2];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+      spyNioSslEngine.engine = testEngine;
+
+      assertThatThrownBy(() -> {
+        try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) {
+        }
+      }).isInstanceOf(SSLException.class)
+          .hasMessageContaining("Error decrypting data");
+    }
   }
 
   @Test
@@ -338,7 +343,11 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class)
+    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
+        .isInstanceOf(IOException.class)
+        .hasMessageContaining("NioSslEngine has been closed");
+    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
+        .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
   }
@@ -367,10 +376,12 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
-      // give the NioSslEngine something to write on its socket channel, simulating a TLS close
-      // message
-      nioSslEngine.myNetData.put("Goodbye cruel world".getBytes());
-      return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+      try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+        // give the NioSslEngine something to write on its socket channel, simulating a TLS close
+        // message
+        outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
+        return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+      }
     });
     when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
     nioSslEngine.close(mockChannel);
@@ -401,37 +412,42 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    // force a compaction by making the decoded buffer appear near to being full
-    ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData;
-    unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
-    unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
-
-    // simulate some socket reads
-    when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
-      @Override
-      public Integer answer(InvocationOnMock invocation) throws Throwable {
-        ByteBuffer buffer = invocation.getArgument(0);
-        buffer.position(buffer.position() + individualRead);
-        return individualRead;
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // force a compaction by making the decoded buffer appear near to being full
+      ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
+      unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
+      unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
+
+      // simulate some socket reads
+      when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+        @Override
+        public Integer answer(InvocationOnMock invocation) throws Throwable {
+          ByteBuffer buffer = invocation.getArgument(0);
+          buffer.position(buffer.position() + individualRead);
+          return individualRead;
+        }
+      });
+
+      TestSSLEngine testSSLEngine = new TestSSLEngine();
+      testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
+      nioSslEngine.engine = testSSLEngine;
+
+      try (final ByteBufferSharing sharedBuffer =
+          nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+        ByteBuffer data = sharedBuffer.getBuffer();
+        verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+        assertThat(data.position()).isEqualTo(0);
+        assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
       }
-    });
-
-    TestSSLEngine testSSLEngine = new TestSSLEngine();
-    testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
-    nioSslEngine.engine = testSSLEngine;
-
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+    }
   }
 
 
   /**
-   * This tests the case where a message header has been read and part of a message has been
-   * read, but the decoded buffer is too small to hold all of the message. In this case
-   * the readAtLeast method will have to expand the capacity of the decoded buffer and return
-   * the new, expanded, buffer as the method result.
+   * This tests the case where a message header has been read and part of a message has been read,
+   * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast
+   * method will have to expand the capacity of the decoded buffer and return the new, expanded,
+   * buffer as the method result.
    */
   @Test
   public void readAtLeastUsingSmallAppBuffer() throws Exception {
@@ -445,7 +461,11 @@ public class NioSslEngineTest {
     int initialUnwrappedBufferSize = 100;
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.peerAppData = unwrappedBuffer;
+
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -465,22 +485,26 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
-    // The initial available space in the unwrapped buffer should have doubled
-    int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-    assertThat(nioSslEngine.peerAppData.capacity())
-        .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+    try (final ByteBufferSharing sharedBuffer =
+        nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+      // The initial available space in the unwrapped buffer should have doubled
+      int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
+      try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+        assertThat(inputSharing.getBuffer().capacity())
+            .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+      }
+    }
   }
 
 
   /**
-   * This tests the case where a message header has been read and part of a message has been
-   * read, but the decoded buffer is too small to hold all of the message. In this case
-   * the buffer is completely full and should only take one overflow response to resolve
-   * the problem.
+   * This tests the case where a message header has been read and part of a message has been read,
+   * but the decoded buffer is too small to hold all of the message. In this case the buffer is
+   * completely full and should only take one overflow response to resolve the problem.
    */
   @Test
   public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception {
@@ -495,7 +519,10 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to being full
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.peerAppData = unwrappedBuffer;
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -515,11 +542,14 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit())
-        .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+    try (final ByteBufferSharing sharedBuffer =
+        nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit())
+          .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+    }
   }
 
 
@@ -657,8 +687,8 @@ public class NioSslEngineTest {
     }
 
     /**
-     * add an engine operation result to be returned by wrap or unwrap.
-     * Like Mockito's thenReturn(), the last return result will repeat forever
+     * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(),
+     * the last return result will repeat forever
      */
     void addReturnResult(SSLEngineResult... sslEngineResult) {
       for (SSLEngineResult result : sslEngineResult) {