You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/24 22:45:33 UTC
[geode] 01/01: GEODE-8020: buffer corruption in SSL communications
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-8020
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a0b7880c978c23fd13da31a7161c1ffec2d9285e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 24 15:40:54 2020 -0700
GEODE-8020: buffer corruption in SSL communications
revert change in GEODE-6661 that made NioSslEngine use a direct buffer.
This class is not designed to share its buffer with a pool in
BufferPool. Connection is also modified to use a heap buffer for
reading encrypted SSL packets for consistency. New tests ensure that
these buffers are the correct type when using SSL or plain sockets.
---
.../geode/ClusterCommunicationsDUnitTest.java | 33 +++++++++++++++++++++-
.../apache/geode/internal/net/NioSslEngine.java | 4 +--
.../org/apache/geode/internal/tcp/Connection.java | 16 +++++++----
.../geode/internal/net/NioSslEngineTest.java | 5 ++++
4 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 3ae79fd..09f94e5 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -43,6 +43,8 @@ import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.nio.Buffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -69,6 +71,7 @@ import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MessageWithReply;
@@ -76,12 +79,15 @@ import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.DirectReplyMessage;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
@@ -143,7 +149,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
}
@Test
- public void createEntryAndVerifyUpdate() {
+ public void createEntryAndVerifyUpdate() throws Exception {
int locatorPort = createLocator(getVM(0));
for (int i = 1; i <= NUM_SERVERS; i++) {
createCacheAndRegion(getVM(i), locatorPort);
@@ -157,6 +163,9 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
}
for (int i = 1; i <= NUM_SERVERS; i++) {
verifyUpdatedEntry(getVM(i));
+ if (!disableTcp) {
+ verifyBufferType(getVM(i));
+ }
}
}
@@ -245,6 +254,28 @@ public class ClusterCommunicationsDUnitTest implements Serializable {
});
}
+ private void verifyBufferType(VM vm) throws Exception {
+ vm.invoke("verify connection type", () -> {
+ assertThat(cache).isNotNull();
+ InternalCache internalCache = (InternalCache) cache;
+ final DistributionImpl distribution =
+ (DistributionImpl) internalCache.getDistributionManager().getDistribution();
+ InternalDistributedMember locatorMember =
+ (InternalDistributedMember) distribution.getCoordinator();
+ final Connection connection =
+ distribution.getDirectChannel().getConduit().getConnection(locatorMember, false, false,
+ System.currentTimeMillis(), 15000, 0);
+ Field inputBufferField = Connection.class.getDeclaredField("inputBuffer");
+ inputBufferField.setAccessible(true);
+ Buffer inputBuffer = (Buffer) inputBufferField.get(connection);
+ // SSL connections use heap buffers for decoding efficiency. Non-SSL connections use
+ // direct buffers since they don't need to be accessed as much
+ assertThat(inputBuffer.isDirect()).isNotEqualTo(useSSL);
+ });
+ }
+
+
+
private void performCreate(VM memberVM) {
memberVM.invoke("perform create", () -> cache
.getRegion(regionName).put("testKey", "testValue"));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..424e53a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -74,7 +74,7 @@ public class NioSslEngine implements NioFilter {
int packetBufferSize = engine.getSession().getPacketBufferSize();
this.engine = engine;
this.bufferPool = bufferPool;
- this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
+ this.myNetData = bufferPool.acquireNonDirectSenderBuffer(packetBufferSize);
this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
}
@@ -301,7 +301,7 @@ public class NioSslEngine implements NioFilter {
ByteBuffer buffer = wrappedBuffer;
int requiredSize = engine.getSession().getPacketBufferSize();
if (buffer == null) {
- buffer = bufferPool.acquireDirectBuffer(bufferType, requiredSize);
+ buffer = bufferPool.acquireNonDirectBuffer(bufferType, requiredSize);
} else if (buffer.capacity() < requiredSize) {
buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 7fbf4b2..f8f6932 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1662,8 +1662,8 @@ public class Connection implements Runnable {
} catch (IOException e) {
// "Socket closed" check needed for Solaris jdk 1.4.2_08
if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
- if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
- logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+ if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+ logger.info("{} io exception for {}", p2pReaderName(), this, e);
}
if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
if (logger.isDebugEnabled()) {
@@ -1720,7 +1720,7 @@ public class Connection implements Runnable {
if (inputBuffer != null) {
getBufferPool().releaseReceiveBuffer(inputBuffer);
}
- inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
+ inputBuffer = getBufferPool().acquireNonDirectReceiveBuffer(packetBufferSize);
}
if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
channel.socket().setReceiveBufferSize(packetBufferSize);
@@ -1763,9 +1763,13 @@ public class Connection implements Runnable {
}
msg = msg.toLowerCase();
- return msg.contains("forcibly closed")
- || msg.contains("reset by peer")
- || msg.contains("connection reset");
+
+ if (e instanceof SSLException && msg.contains("status = closed")) {
+ return true; // engine has been closed - this is normal
+ }
+
+ return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+ || msg.contains("connection reset") || msg.contains("socket is closed"));
}
private static boolean validMsgType(int msgType) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ef16a21..2c9be77 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -86,6 +86,11 @@ public class NioSslEngineTest {
}
@Test
+ public void engineUsesHeapBuffers() {
+ assertThat(nioSslEngine.myNetData.isDirect()).isFalse();
+ }
+
+ @Test
public void handshake() throws Exception {
SocketChannel mockChannel = mock(SocketChannel.class);
when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);