You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/07/14 19:11:27 UTC

[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454564779



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
##########
@@ -47,7 +48,10 @@
 /**
  * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
  * safe. Its use should be confined to one thread or should be protected by external
- * synchronization.
+ * synchronization.<br>
+ * While some NioSslEngine methods take a Socket as a parameter the given socket must hold
+ * a NIO Channel for i/o operations. If this is not the case these methods will likely throw
+ * a NullPointerException when they attempt to access and use the channel.

Review comment:
       thanks for the tip

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       ditto

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       This PR eliminates the use of `NioFilter` (interface) and its two implementations: `NioPlainEngine`, `NioSslEngine` from Geode. Why were changes necessary to these classes and their tests?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);

Review comment:
       how do we know `alreadySetInSocket` should be `true` in this invocation?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }
+    return amountRead;
+  }
+
+  private static int readFromStream(InputStream stream, ByteBuffer inputBuffer) throws IOException {
+    int amountRead;
+    // if bytes are available we read that number of bytes. Otherwise we do a blocking read
+    // of buffer.remaining() bytes
+    int amountToRead = inputBuffer.remaining();
+    // stream.available() > 0 ? Math.min(stream.available(), inputBuffer.remaining())
+    // : inputBuffer.remaining();
+    if (inputBuffer.hasArray()) {
+      amountRead = stream.read(inputBuffer.array(),
+          inputBuffer.arrayOffset() + inputBuffer.position(), amountToRead);
+      if (amountRead > 0) {
+        inputBuffer.position(inputBuffer.position() + amountRead);
+      }
+    } else {

Review comment:
       under what conditions would `inputBuffer.hasArray()` be `true` vs `false`?
   
   what test coverage do we have of these conditions? (I just confirmed that the package-level unit tests for `org.apache.geode.internal.net` don't hit this method at all)

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       my read of `configureInputStream()` is that it initializes the `inputStream` field and does the TLS handshake if one is needed. That implies that a connect call happens in that case.
   
   inasmuch as `configureInputStream()` causes connect for the TLS case, would it make sense for it to also handle the connect for the non-TLS case too? just thinking of ways to reduce the number of branches on `getConduit().useSSL()`
   
   ugh but now I see `configureInputStream()` only initiates the TLS handshake if `!clientSocket`. Why does it only initiate the handshake for "server" sockets? 

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;

Review comment:
       what does this variable mean? what are its invariants?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */

Review comment:
       Update JavaDoc comment to explain the role of `socketInputStream`. 
   
   It appears that this input stream is an input stream constructed from the socket over in `ClusterConnection.configureInputStream()`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }

Review comment:
       are we avoiding use of the `InputStream` in the non-TLS case, as a performance enhancement?
   
   would this method work correctly if the conditional was replaced by:
   
   ```java
          amountRead = readFromStream(socketInputStream, inputBuffer); 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org