You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/24 22:45:33 UTC

[geode] 01/01: GEODE-8020: buffer corruption in SSL communications

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-8020
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a0b7880c978c23fd13da31a7161c1ffec2d9285e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 24 15:40:54 2020 -0700

    GEODE-8020: buffer corruption in SSL communications
    
    revert change in GEODE-6661 that made NioSslEngine use a direct buffer.
    This class is not designed to share its buffer with a pool in
    BufferPool.  Connection is also modified to use a heap buffer for
    reading encrypted SSL packets for consistency.  New tests ensure that
    these buffers are the correct type when using SSL or plain sockets.
---
 .../geode/ClusterCommunicationsDUnitTest.java      | 33 +++++++++++++++++++++-
 .../apache/geode/internal/net/NioSslEngine.java    |  4 +--
 .../org/apache/geode/internal/tcp/Connection.java  | 16 +++++++----
 .../geode/internal/net/NioSslEngineTest.java       |  5 ++++
 4 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 3ae79fd..09f94e5 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -43,6 +43,8 @@ import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.nio.Buffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -69,6 +71,7 @@ import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionImpl;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.MessageWithReply;
@@ -76,12 +79,15 @@ import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.DirectReplyMessage;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.tcp.Connection;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
@@ -143,7 +149,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
   }
 
   @Test
-  public void createEntryAndVerifyUpdate() {
+  public void createEntryAndVerifyUpdate() throws Exception {
     int locatorPort = createLocator(getVM(0));
     for (int i = 1; i <= NUM_SERVERS; i++) {
       createCacheAndRegion(getVM(i), locatorPort);
@@ -157,6 +163,9 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
     }
     for (int i = 1; i <= NUM_SERVERS; i++) {
       verifyUpdatedEntry(getVM(i));
+      if (!disableTcp) {
+        verifyBufferType(getVM(i));
+      }
     }
   }
 
@@ -245,6 +254,28 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
     });
   }
 
+  private void verifyBufferType(VM vm) throws Exception {
+    vm.invoke("verify connection type", () -> {
+      assertThat(cache).isNotNull();
+      InternalCache internalCache = (InternalCache) cache;
+      final DistributionImpl distribution =
+          (DistributionImpl) internalCache.getDistributionManager().getDistribution();
+      InternalDistributedMember locatorMember =
+          (InternalDistributedMember) distribution.getCoordinator();
+      final Connection connection =
+          distribution.getDirectChannel().getConduit().getConnection(locatorMember, false, false,
+              System.currentTimeMillis(), 15000, 0);
+      Field inputBufferField = Connection.class.getDeclaredField("inputBuffer");
+      inputBufferField.setAccessible(true);
+      Buffer inputBuffer = (Buffer) inputBufferField.get(connection);
+      // SSL connections use heap buffers for decoding efficiency. Non-SSL connections use
+      // direct buffers since they don't need to be accessed as much
+      assertThat(inputBuffer.isDirect()).isNotEqualTo(useSSL);
+    });
+  }
+
+
+
   private void performCreate(VM memberVM) {
     memberVM.invoke("perform create", () -> cache
         .getRegion(regionName).put("testKey", "testValue"));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..424e53a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -74,7 +74,7 @@ public class NioSslEngine implements NioFilter {
     int packetBufferSize = engine.getSession().getPacketBufferSize();
     this.engine = engine;
     this.bufferPool = bufferPool;
-    this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
+    this.myNetData = bufferPool.acquireNonDirectSenderBuffer(packetBufferSize);
     this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
   }
 
@@ -301,7 +301,7 @@ public class NioSslEngine implements NioFilter {
     ByteBuffer buffer = wrappedBuffer;
     int requiredSize = engine.getSession().getPacketBufferSize();
     if (buffer == null) {
-      buffer = bufferPool.acquireDirectBuffer(bufferType, requiredSize);
+      buffer = bufferPool.acquireNonDirectBuffer(bufferType, requiredSize);
     } else if (buffer.capacity() < requiredSize) {
       buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 7fbf4b2..f8f6932 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1662,8 +1662,8 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
-              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+              logger.info("{} io exception for {}", p2pReaderName(), this, e);
             }
             if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
               if (logger.isDebugEnabled()) {
@@ -1720,7 +1720,7 @@ public class Connection implements Runnable {
         if (inputBuffer != null) {
           getBufferPool().releaseReceiveBuffer(inputBuffer);
         }
-        inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
+        inputBuffer = getBufferPool().acquireNonDirectReceiveBuffer(packetBufferSize);
       }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
@@ -1763,9 +1763,13 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return msg.contains("forcibly closed")
-        || msg.contains("reset by peer")
-        || msg.contains("connection reset");
+
+    if (e instanceof SSLException && msg.contains("status = closed")) {
+      return true; // engine has been closed - this is normal
+    }
+
+    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+        || msg.contains("connection reset") || msg.contains("socket is closed"));
   }
 
   private static boolean validMsgType(int msgType) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ef16a21..2c9be77 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -86,6 +86,11 @@ public class NioSslEngineTest {
   }
 
   @Test
+  public void engineUsesHeapBuffers() {
+    assertThat(nioSslEngine.myNetData.isDirect()).isFalse();
+  }
+
+  @Test
   public void handshake() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);