You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/07/09 18:18:53 UTC

[GitHub] [geode] bschuchardt opened a new pull request #5363: Reintroduce use of SSLSocket in cluster communications

bschuchardt opened a new pull request #5363:
URL: https://github.com/apache/geode/pull/5363


   Use of SSLEngine with NIO has proven to be less robust than using
   "old-io" SSLSockets.  This PR reintroduces SSLSockets into cluster
   communications and removes the use of "new IO" SSLEngine as an option.
   
   The structure of the org.apache.geode.internal.tcp package is left
   intact.  None of the old code including Connection.runOioReader() has
   been brought back.  Code differences between use of NIO and OIO are
   minimal and are mostly at the socket-read/socket-write level.  The
   MsgReader class is modified to handle buffer manipulation that was in
   the previous "old IO" implementation but was delegated to the "filter"
   implementation prior to this PR.
   
   Performance testing has shown that serialization and transmission of
   messages works best with "direct" memory buffers while the reading of
   messages with SSL functions best with "heap" memory buffers.
   DirectChannel and DirectReplyProcessor reflect the choice of "direct"
   memory buffers for writing while ClusterConnection's inputBuffer reflects
   the choice of "heap" memory buffers for reading.
   
   There are a few changes to the NIO "engine" implementations that I made
   while testing performance that I decided to keep because they allow the
   use of NioPlainEngine with SSLSockets.  That could be an advantage when
   performing similar work in client/server communications.
   
   The cluster communications class has been renamed to ClusterConnection
   because I'm very tired of the client/server Connection class having the
   same name.  Every time I try to open the class with IntelliJ it
   recommends the client/server class as the first option.  No more!
   
   Thank you for submitting a contribution to Apache Geode.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [x] Is your initial contribution a single, squashed commit?
   
   - [x] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   ### Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-jbarrett commented on pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
pivotal-jbarrett commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656342074


   Can you please post the results the geode-benchmarks running this branch against develop.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454643105



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       on second thought, I'm going to undo those changes.  My first cut at a solution modified the filters to allow their use with an SSLSocket and its input stream, but I decided that it was better to stop using them at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456550193



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       Let me say a little more about these variables - my understanding of doing this external book-keeping on the buffer is to avoid a compaction when there is still room in the buffer to read more data.  The code in ClusterConnection that reads messages still does that compaction and we should eventually remove it and use MsgReader to read all messages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r460299361



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -89,6 +89,10 @@ public static boolean close(final ServerSocket serverSocket) {
    * and buffer.remaining is also zero the limit is changed to be buffer.capacity
    * before reading.
    *
+   * @param socket the socket to read from
+   * @param inputBuffer the buffer into which data should be stored
+   * @param socketInputStream the socket's inputStream, included as a parameter so it can be a
+   *        buffered stream, if desired

Review comment:
       thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt closed pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt closed pull request #5363:
URL: https://github.com/apache/geode/pull/5363


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-658455639


   This pull request **introduces 1 alert** and **fixes 1** when merging ced3c7a3fc32cfa1ba209215d0105a43a0246161 into 1ebd307ae1ae87c53f0ff6b8f0c83ab2feda93cd - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-5ee74036f8700de46d755c33bc1a74a1d0d44345)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt edited a comment on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt edited a comment on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656401747


   ```
    org.apache.geode.benchmark.tests.PartitionedGetBenchmark
                 average ops/second  Baseline:    331221.55  Test:    342201.08  Difference:   +3.3%
          ops/second standard error  Baseline:       521.34  Test:       555.60  Difference:   +6.6%
      ops/second standard deviation  Baseline:      9014.73  Test:      9607.16  Difference:   +6.6%
         YS 99th percentile latency  Baseline:     20071.00  Test:     20071.00  Difference:   +0.0%
                     median latency  Baseline:    362239.00  Test:    361471.00  Difference:   -0.2%
            90th percentile latency  Baseline:   7036927.00  Test:   6995967.00  Difference:   -0.6%
            99th percentile latency  Baseline:  24428543.00  Test:  23085055.00  Difference:   -5.5%
          99.9th percentile latency  Baseline:  34963455.00  Test:  33259519.00  Difference:   -4.9%
                    average latency  Baseline:   2171050.56  Test:   2101421.48  Difference:   -3.2%
         latency standard deviation  Baseline:   5334288.43  Test:   5076754.60  Difference:   -4.8%
             latency standard error  Baseline:       535.20  Test:       501.11  Difference:   -6.4%
   
   
   org.apache.geode.benchmark.tests.PartitionedPutBenchmark
                 average ops/second  Baseline:    178773.53  Test:    182095.99  Difference:   +1.9%
          ops/second standard error  Baseline:       599.58  Test:       835.58  Difference:  +39.4%
      ops/second standard deviation  Baseline:     10367.72  Test:     14448.49  Difference:  +39.4%
         YS 99th percentile latency  Baseline:     20083.00  Test:     20082.80  Difference:   -0.0%
                     median latency  Baseline:    765951.00  Test:    767999.00  Difference:   +0.3%
            90th percentile latency  Baseline:  13459455.00  Test:  13107199.00  Difference:   -2.6%
            99th percentile latency  Baseline:  28983295.00  Test:  27787263.00  Difference:   -4.1%
          99.9th percentile latency  Baseline: 107151359.00  Test: 109379583.00  Difference:   +2.1%
                    average latency  Baseline:   4024484.55  Test:   3951012.79  Difference:   -1.8%
         latency standard deviation  Baseline:   9229899.71  Test:   8986494.06  Difference:   -2.6%
             latency standard error  Baseline:      1260.59  Test:      1216.02  Difference:   -3.5%
   
   
   org.apache.geode.benchmark.tests.ReplicatedGetBenchmark
                 average ops/second  Baseline:    339091.57  Test:    351035.30  Difference:   +3.5%
          ops/second standard error  Baseline:       450.59  Test:       371.17  Difference:  -17.6%
      ops/second standard deviation  Baseline:      7791.42  Test:      6418.07  Difference:  -17.6%
         YS 99th percentile latency  Baseline:     20071.00  Test:     20071.33  Difference:   +0.0%
                     median latency  Baseline:    355839.00  Test:    357375.00  Difference:   +0.4%
            90th percentile latency  Baseline:   6524927.00  Test:   5992447.00  Difference:   -8.2%
            99th percentile latency  Baseline:  24526847.00  Test:  23691263.00  Difference:   -3.4%
          99.9th percentile latency  Baseline:  34013183.00  Test:  32948223.00  Difference:   -3.1%
                    average latency  Baseline:   2120692.40  Test:   2048843.90  Difference:   -3.4%
         latency standard deviation  Baseline:   5263600.14  Test:   5054339.01  Difference:   -4.0%
             latency standard error  Baseline:       521.94  Test:       492.61  Difference:   -5.6%
   
   
   org.apache.geode.benchmark.tests.ReplicatedPutBenchmark
                 average ops/second  Baseline:    192443.22  Test:    194403.34  Difference:   +1.0%
          ops/second standard error  Baseline:       726.17  Test:       788.60  Difference:   +8.6%
      ops/second standard deviation  Baseline:     12556.67  Test:     13636.14  Difference:   +8.6%
         YS 99th percentile latency  Baseline:     20060.00  Test:     20072.00  Difference:   +0.1%
                     median latency  Baseline:    809471.00  Test:    781823.00  Difference:   -3.4%
            90th percentile latency  Baseline:  10690559.00  Test:  11599871.00  Difference:   +8.5%
            99th percentile latency  Baseline:  23724031.00  Test:  24379391.00  Difference:   +2.8%
          99.9th percentile latency  Baseline: 106692607.00  Test: 110166015.00  Difference:   +3.3%
                    average latency  Baseline:   3739364.07  Test:   3701351.14  Difference:   -1.0%
         latency standard deviation  Baseline:   8255312.29  Test:   8420063.75  Difference:   +2.0%
             latency standard error  Baseline:      1086.76  Test:      1102.91  Difference:   +1.5%
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452464617



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
##########
@@ -253,7 +253,7 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
     return newBuffer;
   }
 
-  ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
+  public ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {

Review comment:
       it's now accessed by the org.apache.geode.internal.tcp package




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656401747


   > org.apache.geode.benchmark.tests.PartitionedGetBenchmark
   >              average ops/second  Baseline:    331221.55  Test:    342201.08  Difference:   +3.3%
   >       ops/second standard error  Baseline:       521.34  Test:       555.60  Difference:   +6.6%
   >   ops/second standard deviation  Baseline:      9014.73  Test:      9607.16  Difference:   +6.6%
   >      YS 99th percentile latency  Baseline:     20071.00  Test:     20071.00  Difference:   +0.0%
   >                  median latency  Baseline:    362239.00  Test:    361471.00  Difference:   -0.2%
   >         90th percentile latency  Baseline:   7036927.00  Test:   6995967.00  Difference:   -0.6%
   >         99th percentile latency  Baseline:  24428543.00  Test:  23085055.00  Difference:   -5.5%
   >       99.9th percentile latency  Baseline:  34963455.00  Test:  33259519.00  Difference:   -4.9%
   >                 average latency  Baseline:   2171050.56  Test:   2101421.48  Difference:   -3.2%
   >      latency standard deviation  Baseline:   5334288.43  Test:   5076754.60  Difference:   -4.8%
   >          latency standard error  Baseline:       535.20  Test:       501.11  Difference:   -6.4%
   
   
   >org.apache.geode.benchmark.tests.PartitionedPutBenchmark
   >              average ops/second  Baseline:    178773.53  Test:    182095.99  Difference:   +1.9%
   >       ops/second standard error  Baseline:       599.58  Test:       835.58  Difference:  +39.4%
   >   ops/second standard deviation  Baseline:     10367.72  Test:     14448.49  Difference:  +39.4%
   >      YS 99th percentile latency  Baseline:     20083.00  Test:     20082.80  Difference:   -0.0%
   >                  median latency  Baseline:    765951.00  Test:    767999.00  Difference:   +0.3%
   >         90th percentile latency  Baseline:  13459455.00  Test:  13107199.00  Difference:   -2.6%
   >         99th percentile latency  Baseline:  28983295.00  Test:  27787263.00  Difference:   -4.1%
   >       99.9th percentile latency  Baseline: 107151359.00  Test: 109379583.00  Difference:   +2.1%
   >                 average latency  Baseline:   4024484.55  Test:   3951012.79  Difference:   -1.8%
   >      latency standard deviation  Baseline:   9229899.71  Test:   8986494.06  Difference:   -2.6%
   >          latency standard error  Baseline:      1260.59  Test:      1216.02  Difference:   -3.5%
   
   
   >org.apache.geode.benchmark.tests.ReplicatedGetBenchmark
   >              average ops/second  Baseline:    339091.57  Test:    351035.30  Difference:   +3.5%
   >       ops/second standard error  Baseline:       450.59  Test:       371.17  Difference:  -17.6%
   >   ops/second standard deviation  Baseline:      7791.42  Test:      6418.07  Difference:  -17.6%
   >      YS 99th percentile latency  Baseline:     20071.00  Test:     20071.33  Difference:   +0.0%
   >                  median latency  Baseline:    355839.00  Test:    357375.00  Difference:   +0.4%
   >         90th percentile latency  Baseline:   6524927.00  Test:   5992447.00  Difference:   -8.2%
   >         99th percentile latency  Baseline:  24526847.00  Test:  23691263.00  Difference:   -3.4%
   >       99.9th percentile latency  Baseline:  34013183.00  Test:  32948223.00  Difference:   -3.1%
   >                 average latency  Baseline:   2120692.40  Test:   2048843.90  Difference:   -3.4%
   >      latency standard deviation  Baseline:   5263600.14  Test:   5054339.01  Difference:   -4.0%
   >          latency standard error  Baseline:       521.94  Test:       492.61  Difference:   -5.6%
   
   
   >org.apache.geode.benchmark.tests.ReplicatedPutBenchmark
   >              average ops/second  Baseline:    192443.22  Test:    194403.34  Difference:   +1.0%
   >       ops/second standard error  Baseline:       726.17  Test:       788.60  Difference:   +8.6%
   >   ops/second standard deviation  Baseline:     12556.67  Test:     13636.14  Difference:   +8.6%
   >      YS 99th percentile latency  Baseline:     20060.00  Test:     20072.00  Difference:   +0.1%
   >                  median latency  Baseline:    809471.00  Test:    781823.00  Difference:   -3.4%
   >         90th percentile latency  Baseline:  10690559.00  Test:  11599871.00  Difference:   +8.5%
   >         99th percentile latency  Baseline:  23724031.00  Test:  24379391.00  Difference:   +2.8%
   >       99.9th percentile latency  Baseline: 106692607.00  Test: 110166015.00  Difference:   +3.3%
   >                 average latency  Baseline:   3739364.07  Test:   3701351.14  Difference:   -1.0%
   >      latency standard deviation  Baseline:   8255312.29  Test:   8420063.75  Difference:   +2.0%
   >          latency standard error  Baseline:      1086.76  Test:      1102.91  Difference:   +1.5%
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454742632



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       ah I failed to understand that both these things are true at once:
   
   "This PR … removes the use of "new IO" SSLEngine as an option."
   
   and
   
   "There are a few changes to the NIO "engine" implementations that I made
   while testing performance that I decided to keep because they allow the
   use of NioPlainEngine with SSLSockets. That could be an advantage when
   performing similar work in client/server communications"
   
   If I understand: implication of the second bit is that we didn't remove SSLEngine from the codebase—we only eliminated it in the P2P messaging path.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454617785



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       that's explained in my initial comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454629296



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }
+    return amountRead;
+  }
+
+  private static int readFromStream(InputStream stream, ByteBuffer inputBuffer) throws IOException {
+    int amountRead;
+    // if bytes are available we read that number of bytes. Otherwise we do a blocking read
+    // of buffer.remaining() bytes
+    int amountToRead = inputBuffer.remaining();
+    // stream.available() > 0 ? Math.min(stream.available(), inputBuffer.remaining())
+    // : inputBuffer.remaining();
+    if (inputBuffer.hasArray()) {
+      amountRead = stream.read(inputBuffer.array(),
+          inputBuffer.arrayOffset() + inputBuffer.position(), amountToRead);
+      if (amountRead > 0) {
+        inputBuffer.position(inputBuffer.position() + amountRead);
+      }
+    } else {

Review comment:
       HeapByteBuffer has an array.  DirectByteBuffer does not have an array.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656369983


   This pull request **introduces 1 alert** and **fixes 1** when merging 7f84de83311e0df3061b1ae4e3f2d9160b8df364 into 62ee81fa428a80470083d1a304d7704b15658d2c - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-c38d1d44d923ee99f0c2d149525d016d1ef02129)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] echobravopapa commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
echobravopapa commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452410523



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -94,11 +96,14 @@
  *
  * @since GemFire 2.0
  */
-public class Connection implements Runnable {
+public class ClusterConnection implements Runnable {

Review comment:
       +1 on the rename

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
##########
@@ -17,24 +17,31 @@
 
 import java.io.EOFException;

Review comment:
       should these classes be renamed since we are using OiO now?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2909,13 +2950,13 @@ private boolean readMessageHeader(ByteBuffer peerDataBuffer) throws IOException
       readerShuttingDown = true;
       requestClose(String.format("Unknown P2P message type: %s",
           nioMessageTypeInteger));
-      return true;
+      return false;

Review comment:
       not obvious why this bool is flipped...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2588,23 +2596,44 @@ void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
           }
           // fall through
         }
-        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-        while (wrappedBuffer.remaining() > 0) {
+        while (buffer.remaining() > 0) {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
-            amtWritten = channel.write(wrappedBuffer);
+            if (socket instanceof SSLSocket) {
+              OutputStream output = socket.getOutputStream();
+              if (buffer.hasArray()) {
+                output.write(buffer.array(), buffer.arrayOffset(),
+                    buffer.limit() - buffer.position());
+                buffer.position(buffer.limit());
+              } else {
+                // socket output streams are FileOutputStreams and have a writeable Channel.
+                // This code merely fetches that channel and writes to it.
+                // Channels.newChannel(output).write(buffer);

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);
 
       } catch (NullPointerException e) {
         // jdk 1.7 sometimes throws an NPE here

Review comment:
       out of scope, but kinda reads like tech debt...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
##########
@@ -253,7 +253,7 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
     return newBuffer;
   }
 
-  ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
+  public ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {

Review comment:
       what required the addition of `public`?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       do we need a story for this TODO?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -140,7 +145,7 @@
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
-  private NioFilter ioFilter;
+  // private NioFilter ioFilter;

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;

Review comment:
       dead code...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       or a git-hook ;)

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -3006,7 +3047,7 @@ private void readMessage(ByteBuffer peerDataBuffer) {
       } catch (IOException ex) {
         // ignored
       }
-    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+    } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {

Review comment:
       looks dead, maybe there was `else if` once upon a time...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -202,6 +202,8 @@
 
   private final Stopper stopper = new Stopper();
 
+  private boolean enableTLSOverNIO = true; // Boolean.getBoolean("geode.enable-tls-nio");
+

Review comment:
       dead code hanging out...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,66 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;
 
 
-
-  MsgReader(Connection conn, NioFilter nioFilter, Version version) {
+  MsgReader(ClusterConnection conn, BufferPool bufferPool, InputStream inputStream,
+      Version version) {
+    this.bufferPool = bufferPool;
     this.conn = conn;
-    this.ioFilter = nioFilter;
+    this.inputStream = inputStream;
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
 
   Header readHeader() throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    ByteBuffer buffer = readAtLeast(ClusterConnection.MSG_HEADER_BYTES);
 
-    Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
-
-    try {
-      int nioMessageLength = unwrappedBuffer.getInt();
-      /* nioMessageVersion = */
-      Connection.calcHdrVersion(nioMessageLength);
-      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-      byte nioMessageType = unwrappedBuffer.get();
-      short nioMsgId = unwrappedBuffer.getShort();
-
-      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-      if (directAck) {
-        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
-      }
+    Assert.assertTrue(buffer.remaining() >= ClusterConnection.MSG_HEADER_BYTES);
 
-      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+    int messageLength = buffer.getInt();
+    /* nioMessageVersion = */

Review comment:
       dead?
   

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -945,6 +962,10 @@ public boolean useSSL() {
     return useSSL;
   }
 
+  public boolean useDirectReceiveBuffers() {
+    return !useSSL();
+  }
+

Review comment:
       previous question might be resolved following this accessor... for ssl/tls we use NIO and !ssl we use the directbuffers, cool and with 6 you get eggroll




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454627647



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-665952869


   This pull request **introduces 1 alert** and **fixes 1** when merging 35b4964da7bd41068d358f97d05454783a856f0c into 4cea862bbbb1f535e48d2879317af18878998c32 - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-cd1937067861d788385408ef0bea7904fe9ef3cb)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452466413



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2909,13 +2950,13 @@ private boolean readMessageHeader(ByteBuffer peerDataBuffer) throws IOException
       readerShuttingDown = true;
       requestClose(String.format("Unknown P2P message type: %s",
           nioMessageTypeInteger));
-      return true;
+      return false;

Review comment:
       The old meaning didn't make a lot of sense.  Returning _false_ when the header is successfully read?  No.  That's why I changed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456522212



##########
File path: geode-core/src/test/java/org/apache/geode/internal/net/SocketUtilsJUnitTest.java
##########
@@ -88,4 +95,86 @@ public void testCloseServerSocketThrowsIOException() throws IOException {
   public void testCloseServerSocketWithNull() {
     assertThat(SocketUtils.close((ServerSocket) null)).isTrue();
   }
+
+  @Test
+  public void readFromSocketWithHeapBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocate(100); // heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    when(channel.read(buffer)).thenAnswer((answer) -> {
+      buffer.put(bytes);
+      return buffer.position();
+    });
+    assertThat(buffer.hasArray()).isTrue();
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was used to read the bytes
+    verify(channel, times(1)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was not used
+    assertThat(stream.available()).isEqualTo(bytes.length);
+  }
+
+
+  @Test
+  public void readFromSocketWithDirectBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(100); // non-heap buffer

Review comment:
       I don't like to factor out code in tests too much.  It leads to tests that are difficult to understand.  Short tests like this should be easy to read and stand on their own.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-jbarrett commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
pivotal-jbarrett commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656765432


   In response to the benchmark results, the values are within the margin of error for benchmarks. The get benchmarks would not much if any P2P operations. Put benchmarks would have 1 P2P op per client/server put op and likely bottlenecked by the p2p operation. There are currently no benchmarks that measure P2P operations in isolation. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454743792



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       In `configureInputStream(Socket socket, boolean clientSocket)` is `clientSocket` a misnomer then? Does that really mean `receiverSocket`?
   
   Remind me: are we exploiting full-duplex communication over the socket in `configureInputStream(Socket…)` or are we only using one direction?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454638055



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       The TLS handshake doesn't imply a connect().  The socket is already connected at this point but hasn't performed the TLS handshake.  This method is used both for "sender" connections and "receiver" connections.  In the latter case the socket is already connected so it wouldn't make sense to move the non-TLS connect into this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456519045



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       I'm not going to change that logic.  It's what used to be in the MsgReader subclasses before I moved it to the NioFilter classes.  This PR just restores that logic to MsgReader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456523975



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -89,6 +89,10 @@ public static boolean close(final ServerSocket serverSocket) {
    * and buffer.remaining is also zero the limit is changed to be buffer.capacity
    * before reading.
    *
+   * @param socket the socket to read from
+   * @param inputBuffer the buffer into which data should be stored
+   * @param socketInputStream the socket's inputStream, included as a parameter so it can be a
+   *        buffered stream, if desired

Review comment:
       The older Old IO code base used a BufferedInputStream wrapper around the socket's input stream on recommendation from StackOverflow and other sources.  I did not find that it increased performance so I removed that wrapper, but I left this parameter in place for flexibility since this is a "Util" class.  This parameter also makes the methods easier to test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454743792



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       In `configureInputStream(Socket socket, boolean clientSocket)` is `clientSocket` a misnomer then? Does that really mean `receiverSocket`?
   
   Remind me: are these sockets full-duplex or half-duplex?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452464382



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -140,7 +145,7 @@
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
-  private NioFilter ioFilter;
+  // private NioFilter ioFilter;

Review comment:
       I'll remove that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-676750568


   This pull request **introduces 1 alert** and **fixes 1** when merging 51c68d95ffdb37432b184926b97bf666be9638b0 into 6f12a360d82f0de9259557af2bca34cd84e4b5f4 - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-40a73393a6a1bd9b3d330f5a2a6e8ebfd5e3dac9)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-672209899


   This pull request **introduces 1 alert** and **fixes 1** when merging 5b573099845881a70be91e7e3c037cdcc6b9779b into 21e26094e5178d5af1ef554448da943e07352469 - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-609b7eb745dec2e7d2fa59a09e2da2bab51ed9ca)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456519338



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }
+    return amountRead;
+  }
+
+  private static int readFromStream(InputStream stream, ByteBuffer inputBuffer) throws IOException {
+    int amountRead;
+    // if bytes are available we read that number of bytes. Otherwise we do a blocking read
+    // of buffer.remaining() bytes
+    int amountToRead = inputBuffer.remaining();
+    // stream.available() > 0 ? Math.min(stream.available(), inputBuffer.remaining())
+    // : inputBuffer.remaining();
+    if (inputBuffer.hasArray()) {
+      amountRead = stream.read(inputBuffer.array(),
+          inputBuffer.arrayOffset() + inputBuffer.position(), amountToRead);
+      if (amountRead > 0) {
+        inputBuffer.position(inputBuffer.position() + amountRead);
+      }
+    } else {

Review comment:
       I've added tests for this, so I'm resolving this thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-656296945


   This pull request **introduces 1 alert** and **fixes 1** when merging 8fbf9f724a2e60eb590c8df34e0df8b52790ee8e into fae1c94187d1b354c8705e8d4dc46842a12a9bcd - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-9df07f232649afab4d230f47096e37c2eab3f224)
   
   **new alerts:**
   
   * 1 for Potential input resource leak
   
   **fixed alerts:**
   
   * 1 for Potential input resource leak


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r460299130



##########
File path: geode-core/src/test/java/org/apache/geode/internal/net/SocketUtilsJUnitTest.java
##########
@@ -88,4 +95,86 @@ public void testCloseServerSocketThrowsIOException() throws IOException {
   public void testCloseServerSocketWithNull() {
     assertThat(SocketUtils.close((ServerSocket) null)).isTrue();
   }
+
+  @Test
+  public void readFromSocketWithHeapBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocate(100); // heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    when(channel.read(buffer)).thenAnswer((answer) -> {
+      buffer.put(bytes);
+      return buffer.position();
+    });
+    assertThat(buffer.hasArray()).isTrue();
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was used to read the bytes
+    verify(channel, times(1)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was not used
+    assertThat(stream.available()).isEqualTo(bytes.length);
+  }
+
+
+  @Test
+  public void readFromSocketWithDirectBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(100); // non-heap buffer

Review comment:
       fair.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452467812



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -202,6 +202,8 @@
 
   private final Stopper stopper = new Stopper();
 
+  private boolean enableTLSOverNIO = true; // Boolean.getBoolean("geode.enable-tls-nio");
+

Review comment:
       ooh - thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454618295



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;

Review comment:
       It is the position of the last processed byte in the buffer and lets us know which byte should be processed next.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r460298986



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       thanks for that clarification @bschuchardt. I thought maybe this code was old (and therefore vetted) but it wasn't obvious from the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454631595



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);

Review comment:
       I'll add a comment there.  Performance degrades if you mess with that buffer size in a SSLSocket, but we want to set the local buffer-size variable for message streamers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454628933



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }

Review comment:
       yes, performance is worse for non-TLS if the stream is used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452919200



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -3006,7 +3047,7 @@ private void readMessage(ByteBuffer peerDataBuffer) {
       } catch (IOException ex) {
         // ignored
       }
-    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+    } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {

Review comment:
       I think that's just an informative comment.  It sets the context for understanding that block of code, so I don't think it should be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454733308



##########
File path: geode-core/src/test/java/org/apache/geode/internal/net/SocketUtilsJUnitTest.java
##########
@@ -88,4 +95,86 @@ public void testCloseServerSocketThrowsIOException() throws IOException {
   public void testCloseServerSocketWithNull() {
     assertThat(SocketUtils.close((ServerSocket) null)).isTrue();
   }
+
+  @Test
+  public void readFromSocketWithHeapBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocate(100); // heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    when(channel.read(buffer)).thenAnswer((answer) -> {
+      buffer.put(bytes);
+      return buffer.position();
+    });
+    assertThat(buffer.hasArray()).isTrue();
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was used to read the bytes
+    verify(channel, times(1)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was not used
+    assertThat(stream.available()).isEqualTo(bytes.length);
+  }
+
+
+  @Test
+  public void readFromSocketWithDirectBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(100); // non-heap buffer

Review comment:
       Great to see this increased code coverage!
   
   From scanning this method and the previous one I think the only two differences are 1) the allocation statement and 2) the verification of `buffer.hasArray()`. It would aid maintenance, I think, if the parts that are identical were factored out.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/net/SocketUtilsJUnitTest.java
##########
@@ -88,4 +95,86 @@ public void testCloseServerSocketThrowsIOException() throws IOException {
   public void testCloseServerSocketWithNull() {
     assertThat(SocketUtils.close((ServerSocket) null)).isTrue();
   }
+
+  @Test
+  public void readFromSocketWithHeapBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocate(100); // heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    when(channel.read(buffer)).thenAnswer((answer) -> {
+      buffer.put(bytes);
+      return buffer.position();
+    });
+    assertThat(buffer.hasArray()).isTrue();
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was used to read the bytes
+    verify(channel, times(1)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was not used
+    assertThat(stream.available()).isEqualTo(bytes.length);
+  }
+
+
+  @Test
+  public void readFromSocketWithDirectBuffer() throws IOException {
+    Socket socket = mock(Socket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(100); // non-heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    when(channel.read(buffer)).thenAnswer((answer) -> {
+      buffer.put(bytes);
+      return buffer.position();
+    });
+    assertThat(buffer.hasArray()).isFalse();
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was used to read the bytes
+    verify(channel, times(1)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was not used
+    assertThat(stream.available()).isEqualTo(bytes.length);
+  }
+
+  @Test
+  public void readFromSSLSocketWithHeapBuffer() throws IOException {
+    Socket socket = mock(SSLSocket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocate(100); // heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    assertThat(stream.available()).isEqualTo(bytes.length);
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was not used to read the bytes
+    verify(channel, times(0)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was used to read the bytes
+    assertThat(stream.available()).isZero();
+  }
+
+
+  @Test
+  public void readFromSSLSocketWithDirectBuffer() throws IOException {
+    Socket socket = mock(SSLSocket.class);
+    SocketChannel channel = mock(SocketChannel.class);
+    when(socket.getChannel()).thenReturn(channel);
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(100); // non-heap buffer
+    byte[] bytes = new byte[100];
+    InputStream stream = new ByteArrayInputStream(bytes);
+    assertThat(stream.available()).isEqualTo(bytes.length);
+    SocketUtils.readFromSocket(socket, buffer, stream);
+    // the channel was not used
+    verify(channel, times(0)).read(buffer);
+    // the buffer was filled
+    assertThat(buffer.position()).isEqualTo(bytes.length);
+    // the stream was used to fill the buffer
+    assertThat(stream.available()).isZero();

Review comment:
       I recommend factoring common code out of these two test methods. In fact all four methods have a common shape.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -89,6 +89,10 @@ public static boolean close(final ServerSocket serverSocket) {
    * and buffer.remaining is also zero the limit is changed to be buffer.capacity
    * before reading.
    *
+   * @param socket the socket to read from
+   * @param inputBuffer the buffer into which data should be stored
+   * @param socketInputStream the socket's inputStream, included as a parameter so it can be a
+   *        buffered stream, if desired

Review comment:
       "if desired" by whom—by what part of the code? The comment implies the caller controls whether or not the stream is used, perhaps by making it `null` or not. Reality is this parameter is dependent on the kind of object referenced by the `socket` parameter.
   
   Any time I see a dependency between parameters like this, especially when it comes with a switch-on-type it makes me wonder if there is a mis-assignment of responsibility somewhere. Is there already some object that could hold the stream? If so, could we delegate this behavior to that object?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452467410



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,66 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;
 
 
-
-  MsgReader(Connection conn, NioFilter nioFilter, Version version) {
+  MsgReader(ClusterConnection conn, BufferPool bufferPool, InputStream inputStream,
+      Version version) {
+    this.bufferPool = bufferPool;
     this.conn = conn;
-    this.ioFilter = nioFilter;
+    this.inputStream = inputStream;
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
 
   Header readHeader() throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    ByteBuffer buffer = readAtLeast(ClusterConnection.MSG_HEADER_BYTES);
 
-    Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
-
-    try {
-      int nioMessageLength = unwrappedBuffer.getInt();
-      /* nioMessageVersion = */
-      Connection.calcHdrVersion(nioMessageLength);
-      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-      byte nioMessageType = unwrappedBuffer.get();
-      short nioMsgId = unwrappedBuffer.getShort();
-
-      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-      if (directAck) {
-        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
-      }
+    Assert.assertTrue(buffer.remaining() >= ClusterConnection.MSG_HEADER_BYTES);
 
-      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+    int messageLength = buffer.getInt();
+    /* nioMessageVersion = */

Review comment:
       hmm, that was already dead code.  I'll remove that but I think I should rename the calcMsgVersion method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454564779



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
##########
@@ -47,7 +48,10 @@
 /**
  * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
  * safe. Its use should be confined to one thread or should be protected by external
- * synchronization.
+ * synchronization.<br>
+ * While some NioSslEngine methods take a Socket as a parameter the given socket must hold
+ * a NIO Channel for i/o operations. If this is not the case these methods will likely throw
+ * a NullPointerException when they attempt to access and use the channel.

Review comment:
       thanks for the tip

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       ditto

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       This PR eliminates the use of `NioFilter` (interface) and its two implementations: `NioPlainEngine`, `NioSslEngine` from Geode. Why were changes necessary to these classes and their tests?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);

Review comment:
       how do we know `alreadySetInSocket` should be `true` in this invocation?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }
+    return amountRead;
+  }
+
+  private static int readFromStream(InputStream stream, ByteBuffer inputBuffer) throws IOException {
+    int amountRead;
+    // if bytes are available we read that number of bytes. Otherwise we do a blocking read
+    // of buffer.remaining() bytes
+    int amountToRead = inputBuffer.remaining();
+    // stream.available() > 0 ? Math.min(stream.available(), inputBuffer.remaining())
+    // : inputBuffer.remaining();
+    if (inputBuffer.hasArray()) {
+      amountRead = stream.read(inputBuffer.array(),
+          inputBuffer.arrayOffset() + inputBuffer.position(), amountToRead);
+      if (amountRead > 0) {
+        inputBuffer.position(inputBuffer.position() + amountRead);
+      }
+    } else {

Review comment:
       under what conditions would `inputBuffer.hasArray()` be `true` vs `false`?
   
   what test coverage do we have of these conditions? (I just confirmed that the package-level unit tests for `org.apache.geode.internal.net` don't hit this method at all)

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       my read of `configureInputStream()` is that it initializes the `inputStream` field and does the TLS handshake if one is needed. That implies that a connect call happens in that case.
   
   inasmuch as `configureInputStream()` causes connect for the TLS case, would it make sense for it to also handle the connect for the non-TLS case too? just thinking of ways to reduce the number of branches on `getConduit().useSSL()`
   
   ugh but now I see `configureInputStream()` only initiates the TLS handshake if `!clientSocket`. Why does it only initiate the handshake for "server" sockets? 

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;

Review comment:
       what does this variable mean? what are its invariants?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */

Review comment:
       Update JavaDoc comment to explain the role of `socketInputStream`. 
   
   It appears that this input stream is an input stream constructed from the socket over in `ClusterConnection.configureInputStream()`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) {
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit is zero
+   * and buffer.remaining is also zero the limit is changed to be buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }

Review comment:
       are we avoiding use of the `InputStream` in the non-TLS case, as a performance enhancement?
   
   would this method work correctly if the conditional was replaced by:
   
   ```java
          amountRead = readFromStream(socketInputStream, inputBuffer); 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on pull request #5363:
URL: https://github.com/apache/geode/pull/5363#issuecomment-658425247


   I've written some geode-benchmark tests to vet this PR.  The tests add a new ClusterTopology to the benchmarks so that there are more servers and workload operations are distributed among the servers.  I wrote a replicated region `get` benchmark but since all data was local it was pretty meaningless and so I removed it.
   
   ```
   org.apache.geode.benchmark.tests.PartitionedGetBenchmark
                 average ops/second  Baseline:    415568.87  Test:    421152.13  Difference:   +1.3%
          ops/second standard error  Baseline:      1137.52  Test:      1202.93  Difference:   +5.7%
      ops/second standard deviation  Baseline:     34087.75  Test:     36047.68  Difference:   +5.7%
         YS 99th percentile latency  Baseline:     20007.00  Test:     20007.33  Difference:   +0.0%
                     median latency  Baseline:      3619.00  Test:      3539.00  Difference:   -2.2%
            90th percentile latency  Baseline:    822271.00  Test:    823295.00  Difference:   +0.1%
            99th percentile latency  Baseline:  41058303.00  Test:  41746431.00  Difference:   +1.7%
          99.9th percentile latency  Baseline:  96010239.00  Test:  95289343.00  Difference:   -0.8%
                    average latency  Baseline:   1732279.46  Test:   1708445.36  Difference:   -1.4%
         latency standard deviation  Baseline:   8040574.12  Test:   8054977.95  Difference:   +0.2%
             latency standard error  Baseline:       416.09  Test:       413.94  Difference:   -0.5%
   org.apache.geode.benchmark.tests.PartitionedPutBenchmark
                 average ops/second  Baseline:     65766.06  Test:     68103.19  Difference:   +3.6%
          ops/second standard error  Baseline:       280.40  Test:       337.96  Difference:  +20.5%
      ops/second standard deviation  Baseline:      8379.10  Test:     10093.67  Difference:  +20.5%
         YS 99th percentile latency  Baseline:     20080.54  Test:     20080.31  Difference:   -0.0%
                     median latency  Baseline:    628735.00  Test:    647679.00  Difference:   +3.0%
            90th percentile latency  Baseline:  31834111.00  Test:  29949951.00  Difference:   -5.9%
            99th percentile latency  Baseline: 173539327.00  Test: 176160767.00  Difference:   +1.5%
          99.9th percentile latency  Baseline: 251789311.00  Test: 271581183.00  Difference:   +7.9%
                    average latency  Baseline:  11008644.65  Test:  10651543.96  Difference:   -3.2%
         latency standard deviation  Baseline:  31209391.24  Test:  30946703.39  Difference:   -0.8%
             latency standard error  Baseline:      4070.65  Test:      3970.15  Difference:   -2.5%
   org.apache.geode.benchmark.tests.ReplicatedPutBenchmark
                 average ops/second  Baseline:     36802.56  Test:     36020.12  Difference:   -2.1%
          ops/second standard error  Baseline:       579.41  Test:       638.69  Difference:  +10.2%
      ops/second standard deviation  Baseline:     16842.76  Test:     18599.04  Difference:  +10.4%
         YS 99th percentile latency  Baseline:     20085.94  Test:     20085.82  Difference:   -0.0%
                     median latency  Baseline:    563711.00  Test:    641535.00  Difference:  +13.8%
            90th percentile latency  Baseline:  32653311.00  Test:  29753343.00  Difference:   -8.9%
            99th percentile latency  Baseline: 507772927.00  Test: 596639743.00  Difference:  +17.5%
          99.9th percentile latency  Baseline: 758644735.00  Test: 882376703.00  Difference:  +16.3%
                    average latency  Baseline:  20738470.30  Test:  20930653.55  Difference:   +0.9%
         latency standard deviation  Baseline:  82082668.05  Test:  90329383.31  Difference:  +10.0%
             latency standard error  Baseline:     14695.56  Test:     16252.34  Difference:  +10.6%
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452465798



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;

Review comment:
       I'm removing that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452465458



##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
##########
@@ -17,24 +17,31 @@
 
 import java.io.EOFException;

Review comment:
       Those classes are no longer used by the tcp package, but will be used by client/server.  I don't think they should be renamed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] Bill commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454741200



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       My comment came out of frustration, perhaps born of laziness. I'm trying to decide whether or not I think the new code in `MsgReader` is correct—by inspection. That's very hard for me.
   
   Earlier in the day I went and read the NIO `Buffer` code since I needed to understand that code to understand these changes. That class has [Jon Bentley](https://www.goodreads.com/book/show/52084.Programming_Pearls)-level comments detailing the purpose of each field along with the field invariants. The documentation of those invariants are a huge aid in visual inspection. 
   
   https://docs.oracle.com/javase/8/docs/api/java/nio/Buffer.html
   
   So with this in mind:
   
   1. my hunch is that `Buffer` already has all the state we need to solve this problem—so I am suspicious of these added fields
   2. it feels like we're doing a lot of poking into the `Buffer` object to control its details—setting limit and position explicitly rather than putting and getting and flipping
   
   If the extra fields are really needed then it would help to see a block comment at the point of declaration, tying together these variables with their invariants. Rather than saying things like "keep track of", it would be helpful to know the relationships between these two variables and between them and the various state variables inside the `Buffer`.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454624371



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       Both of those variables are state used in readAtLeast()
   ```
       // keep track of how much of the buffer contains valid data with lastReadPosition
       lastReadPosition = buffer.position();
   
       // set up the buffer for reading and keep track of how much has been consumed with
       // lastProcessedPosition
       buffer.limit(lastProcessedPosition + bytes);
       buffer.position(lastProcessedPosition);
       lastProcessedPosition += bytes;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #5363: GEODE-8349: Reintroduce use of SSLSocket in cluster communications

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r456520985



##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       "client" socket is a term from TLS, where one side must be the "client" and the other side is the "server".
   The sockets in this class may be one-way or may be bidirectional.  "Shared" connections (those shared between threads) are one-way.  Thread-owned connections are bidirectional.  A P2P "reader" thread reads from the socket and DirectReplyProcessor writes responses.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org