You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/07/09 18:45:38 UTC

[GitHub] [geode] echobravopapa commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

echobravopapa commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452410523



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -94,11 +96,14 @@
  *
  * @since GemFire 2.0
  */
-public class Connection implements Runnable {
+public class ClusterConnection implements Runnable {

Review comment:
       +1 on the rename

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
##########
@@ -17,24 +17,31 @@
 
 import java.io.EOFException;

Review comment:
       should these classes be renamed since we are using OiO now?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2909,13 +2950,13 @@ private boolean readMessageHeader(ByteBuffer peerDataBuffer) throws IOException
       readerShuttingDown = true;
       requestClose(String.format("Unknown P2P message type: %s",
           nioMessageTypeInteger));
-      return true;
+      return false;

Review comment:
       not obvious why this bool is flipped...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2588,23 +2596,44 @@ void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
           }
           // fall through
         }
-        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-        while (wrappedBuffer.remaining() > 0) {
+        while (buffer.remaining() > 0) {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
-            amtWritten = channel.write(wrappedBuffer);
+            if (socket instanceof SSLSocket) {
+              OutputStream output = socket.getOutputStream();
+              if (buffer.hasArray()) {
+                output.write(buffer.array(), buffer.arrayOffset(),
+                    buffer.limit() - buffer.position());
+                buffer.position(buffer.limit());
+              } else {
+                // socket output streams are FileOutputStreams and have a writeable Channel.
+                // This code merely fetches that channel and writes to it.
+                // Channels.newChannel(output).write(buffer);

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);
 
       } catch (NullPointerException e) {
         // jdk 1.7 sometimes throws an NPE here

Review comment:
       out of scope, but kinda reads like tech debt...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
##########
@@ -253,7 +253,7 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
     return newBuffer;
   }
 
-  ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
+  public ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {

Review comment:
       what required the addition of `public`?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       do we need a story for this TODO?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -140,7 +145,7 @@
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
-  private NioFilter ioFilter;
+  // private NioFilter ioFilter;

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       or a git-hook ;)

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -3006,7 +3047,7 @@ private void readMessage(ByteBuffer peerDataBuffer) {
       } catch (IOException ex) {
         // ignored
       }
-    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+    } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {

Review comment:
       looks dead, maybe there was `else if` once upon a time...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -202,6 +202,8 @@
 
   private final Stopper stopper = new Stopper();
 
+  private boolean enableTLSOverNIO = true; // Boolean.getBoolean("geode.enable-tls-nio");
+

Review comment:
       dead code hanging out...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,66 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;
 
 
-
-  MsgReader(Connection conn, NioFilter nioFilter, Version version) {
+  MsgReader(ClusterConnection conn, BufferPool bufferPool, InputStream inputStream,
+      Version version) {
+    this.bufferPool = bufferPool;
     this.conn = conn;
-    this.ioFilter = nioFilter;
+    this.inputStream = inputStream;
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
 
   Header readHeader() throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    ByteBuffer buffer = readAtLeast(ClusterConnection.MSG_HEADER_BYTES);
 
-    Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
-
-    try {
-      int nioMessageLength = unwrappedBuffer.getInt();
-      /* nioMessageVersion = */
-      Connection.calcHdrVersion(nioMessageLength);
-      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-      byte nioMessageType = unwrappedBuffer.get();
-      short nioMsgId = unwrappedBuffer.getShort();
-
-      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-      if (directAck) {
-        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
-      }
+    Assert.assertTrue(buffer.remaining() >= ClusterConnection.MSG_HEADER_BYTES);
 
-      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+    int messageLength = buffer.getInt();
+    /* nioMessageVersion = */

Review comment:
       dead?
   

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -945,6 +962,10 @@ public boolean useSSL() {
     return useSSL;
   }
 
+  public boolean useDirectReceiveBuffers() {
+    return !useSSL();
+  }
+

Review comment:
       previous question might be resolved following this accessor... for ssl/tls we use NIO and !ssl we use the directbuffers, cool and with 6 you get eggroll




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org