You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2020/11/06 16:23:23 UTC

[geode] branch support/1.13 updated: [BACKPORT] GEODE-8681: peer-to-peer message loss due to sending connection closi… (#5713)

This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 4cedf9f  [BACKPORT] GEODE-8681: peer-to-peer message loss due to sending connection closi… (#5713)
4cedf9f is described below

commit 4cedf9ff63952bb2a200ba0f218216a4e2ec272a
Author: Ernie Burghardt <eb...@pivotal.io>
AuthorDate: Fri Nov 6 08:22:36 2020 -0800

    [BACKPORT] GEODE-8681: peer-to-peer message loss due to sending connection closi… (#5713)
    
    * GEODE-8681: peer-to-peer message loss due to sending connection closing with TLS enabled (#5699)
    
    A socket-read could pick up more than one message and a single unwrap()
    could decrypt multiple messages.
    Normally the engine isn't closed and it reports normal
    status from an unwrap() operation, and Connection.processInputBuffer
    picks up each message, one by one, from the buffer and dispatches them.
    But if the SSLEngine is closed we were ignoring any already-decrypted
    data sitting in the unwrapped buffer and instead we were throwing an SSLException.
    
    (cherry picked from commit 7da8f9b516ac1e2525a1dfc922af7bfb8995f2c6)
    
    
    Authored-by: Bruce Schuchardt <bs...@pivotal.io>
---
 .../geode/ClusterCommunicationsDUnitTest.java      | 87 ++++++++++++++++++++--
 .../apache/geode/internal/net/NioSslEngine.java    | 12 ++-
 .../geode/internal/net/NioSslEngineTest.java       | 29 +++++++-
 3 files changed, 114 insertions(+), 14 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 3ae79fd..fc72cd2 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -29,6 +29,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_A
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
 import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.internal.OperationExecutors.SERIAL_EXECUTOR;
 import static org.apache.geode.internal.serialization.DataSerializableFixedID.SERIAL_ACKED_MESSAGE;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
@@ -51,6 +52,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -64,7 +66,10 @@ import org.junit.runners.Parameterized.UseParametersRunnerFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
@@ -72,7 +77,6 @@ import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.MessageWithReply;
-import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.SerialAckedMessage;
@@ -82,6 +86,8 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.DirectReplyMessage;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
@@ -111,6 +117,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
   private final String regionName = "clusterTestRegion";
 
   private final boolean disableTcp;
+  private boolean useDAck;
   private boolean conserveSockets;
   private boolean useSSL;
 
@@ -134,6 +141,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
     useSSL = runConfiguration.useSSL;
     conserveSockets = runConfiguration.conserveSockets;
     disableTcp = runConfiguration.disableTcp;
+    useDAck = true;
   }
 
   @Before
@@ -143,6 +151,68 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
   }
 
   @Test
+  public void applicationUseOfDLockWithNoAckCacheOps() throws Exception {
+    useDAck = false; // use no-ack scope
+    DUnitBlackboard dUnitBlackboard = new DUnitBlackboard();
+    int locatorPort = createLocator(getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createCacheAndRegion(getVM(i), locatorPort);
+    }
+    performCreate(getVM(1));
+    getVM(1).invoke("initialize dlock service", () -> {
+      DistributedLockService distLockService =
+          DistributedLockService.create("testLockService", cache.getDistributedSystem());
+      distLockService.lock("myLock", 50000, 50000);
+    });
+    AsyncInvocation async = waitForTheLockAsync(dUnitBlackboard, true);
+    for (int i = 0; i < 5; i++) {
+      getVM(1).invoke("update cache and release lock in vm1", () -> {
+        DistributedLockService distLockService =
+            DistributedLockService.getServiceNamed("testLockService");
+        try {
+          DistributedSystem.setThreadsSocketPolicy(false);
+          cache.getRegion(regionName).put("myKey", "myValue");
+        } finally {
+          distLockService.unlock("myLock");
+          DistributedSystem.releaseThreadsSockets();
+        }
+      });
+      async.get(30, TimeUnit.SECONDS);
+      getVM(2).invoke("release the lock in vm2 to try again", () -> {
+        DistributedLockService distLockService =
+            DistributedLockService.getServiceNamed("testLockService");
+        distLockService.unlock("myLock");
+      });
+      getVM(1).invoke("grab the lock in vm1", () -> {
+        DistributedLockService distLockService =
+            DistributedLockService.getServiceNamed("testLockService");
+        distLockService.lock("myLock", 50000, 50000);
+      });
+      async = waitForTheLockAsync(dUnitBlackboard, false);
+    }
+  }
+
+  @NotNull
+  private AsyncInvocation waitForTheLockAsync(DUnitBlackboard dUnitBlackboard, boolean initialWait)
+      throws Exception {
+    dUnitBlackboard.clearGate("waitingForLock");
+    AsyncInvocation async = getVM(2).invokeAsync("wait for the lock", () -> {
+      DistributedLockService distLockService = initialWait
+          ? DistributedLockService.create("testLockService", cache.getDistributedSystem())
+          : DistributedLockService.getServiceNamed("testLockService");
+      final DUnitBlackboard myBlackboard = new DUnitBlackboard();
+      myBlackboard.signalGate("waitingForLock");
+      distLockService.lock("myLock", 50000, 50000);
+    });
+    dUnitBlackboard.waitForGate("waitingForLock", 30, TimeUnit.SECONDS);
+    Thread.sleep(2000);
+    if (async.isDone()) {
+      throw new Exception("async thread did not wait for the lock");
+    }
+    return async;
+  }
+
+  @Test
   public void createEntryAndVerifyUpdate() {
     int locatorPort = createLocator(getVM(0));
     for (int i = 1; i <= NUM_SERVERS; i++) {
@@ -219,7 +289,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
       // System.setProperty("javax.net.debug", "all");
       Properties props = getDistributedSystemProperties();
       // locator must restart with the same port so that it reconnects to the server
-      await().atMost(getTimeout().toMillis(), TimeUnit.MILLISECONDS)
+      await().atMost(getTimeout().getSeconds(), TimeUnit.SECONDS)
           .until(() -> Locator.startLocatorAndDS(locatorPort, new File(""), props) != null);
       assertThat(Locator.getLocator().getDistributedSystem().getAllOtherMembers().size())
           .isGreaterThan(0);
@@ -241,7 +311,9 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
   private void createCacheAndRegion(VM memberVM, int locatorPort) {
     memberVM.invoke("start cache and create region", () -> {
       cache = createCache(locatorPort);
-      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+      cache.createRegionFactory(RegionShortcut.REPLICATE)
+          .setScope(useDAck ? Scope.DISTRIBUTED_ACK : Scope.DISTRIBUTED_NO_ACK)
+          .create(regionName);
     });
   }
 
@@ -397,15 +469,14 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
     }
 
     @Override
-    public void toData(DataOutput out,
-        SerializationContext context) throws IOException {
+    public void toData(DataOutput out, SerializationContext context) throws IOException {
       super.toData(out, context);
       out.writeInt(processorId);
     }
 
     @Override
-    public void fromData(DataInput in,
-        DeserializationContext context) throws IOException, ClassNotFoundException {
+    public void fromData(DataInput in, DeserializationContext context)
+        throws IOException, ClassNotFoundException {
       super.fromData(in, context);
       processorId = in.readInt();
     }
@@ -430,7 +501,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
 
     @Override
     public int getProcessorType() {
-      return OperationExecutors.SERIAL_EXECUTOR;
+      return SERIAL_EXECUTOR;
     }
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 6f32501..7006d53 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -274,7 +274,8 @@ public class NioSslEngine implements NioFilter {
     // during the previous unwrap
 
     peerAppData.limit(peerAppData.capacity());
-    while (wrappedBuffer.hasRemaining()) {
+    boolean stopDecryption = false;
+    while (wrappedBuffer.hasRemaining() && !stopDecryption) {
       SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
       switch (unwrapResult.getStatus()) {
         case BUFFER_OVERFLOW:
@@ -293,8 +294,13 @@ public class NioSslEngine implements NioFilter {
           return peerAppData;
         case OK:
           break;
-        default:
-          throw new SSLException("Error decrypting data: " + unwrapResult);
+        default:// if there is data in the decrypted buffer return it. Otherwise signal that we're
+          // having trouble
+          if (peerAppData.position() <= 0) {
+            throw new SSLException("Error decrypting data: " + unwrapResult);
+          }
+          stopDecryption = true;
+          break;
       }
     }
     wrappedBuffer.clear();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ee4aaa3..f467777 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -152,8 +152,7 @@ public class NioSslEngineTest {
     ByteBuffer byteBuffer = ByteBuffer.allocate(netBufferSize);
 
     assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000, byteBuffer))
-        .isInstanceOf(
-            SocketException.class)
+        .isInstanceOf(SocketException.class)
         .hasMessageContaining("handshake terminated");
   }
 
@@ -328,6 +327,29 @@ public class NioSslEngineTest {
   }
 
   @Test
+  public void unwrapWithClosedEngineButDataInDecryptedBuffer() throws IOException {
+    final ByteBuffer unwrappedBuffer = nioSslEngine.getUnwrappedBuffer(null);
+    // make the application data too big to fit into the engine's encryption buffer
+    ByteBuffer wrappedData =
+        ByteBuffer.allocate(unwrappedBuffer.capacity());
+    byte[] netBytes = new byte[wrappedData.capacity() / 2];
+    Arrays.fill(netBytes, (byte) 0x1F);
+    wrappedData.put(netBytes);
+    wrappedData.flip();
+    final int arbitraryAmountOfRealData = 31; // bytes
+    unwrappedBuffer.position(arbitraryAmountOfRealData);
+
+    // create an engine that will transfer bytes from the application buffer to the encrypted
+    // buffer
+    TestSSLEngine testEngine = new TestSSLEngine();
+    testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+    spyNioSslEngine.engine = testEngine;
+
+    final ByteBuffer unwrappedBuffer2 = spyNioSslEngine.unwrap(wrappedData);
+    assertThat(unwrappedBuffer2.position()).isEqualTo(arbitraryAmountOfRealData);
+  }
+
+  @Test
   public void close() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     Socket mockSocket = mock(Socket.class);
@@ -557,7 +579,8 @@ public class NioSslEngineTest {
     public SSLEngineResult unwrap(ByteBuffer source, ByteBuffer[] destinations, int i, int i1) {
       SSLEngineResult sslEngineResult = nextResult();
       if (sslEngineResult.getStatus() != BUFFER_UNDERFLOW
-          && sslEngineResult.getStatus() != BUFFER_OVERFLOW) {
+          && sslEngineResult.getStatus() != BUFFER_OVERFLOW
+          && sslEngineResult.getStatus() != CLOSED) {
         destinations[0].put(source);
         numberOfUnwrapsPerformed++;
       }