You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hasnain-db (via GitHub)" <gi...@apache.org> on 2023/10/26 16:34:37 UTC

[PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

hasnain-db opened a new pull request, #43541:
URL: https://github.com/apache/spark/pull/43541

   ### What changes were proposed in this pull request?
   
   This integrates SSL support into TransportContext and related modules so that the RPC SSL functionality can work when properly configured.
   
   ### Why are the changes needed?
   
   This is needed in order to support SSL for RPC connections.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   CI
   
   Ran the following tests:
   
   ```
   build/sbt -P yarn
   > project network-common
   > testOnly
   > project network-shuffle
   > testOnly
   > project core
   > testOnly *Ssl*
   > project yarn
   > testOnly org.apache.spark.network.yarn.SslYarnShuffleServiceWithRocksDBBackendSuite
   ```
   
   I verified traffic was encrypted using TLS using two mechanisms:
   
   * Enabled trace level logging for Netty and JDK SSL and saw logs confirming TLS handshakes were happening
   * I ran wireshark on my machine and snooped on traffic while sending queries shuffling a fixed string. Without any encryption, I could find that string in the network traffic. With this encryption enabled, that string did not show up, and wireshark logs confirmed a TLS handshake was happening.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375362260


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   My general bias was to not add a constraint on production code just for tests -- but it makes sense to tag this as shareable, so I'll do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375190169


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   Why not tag it as `@Sharable` ? It is stateless anyway, right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1374225324


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   qq: why do we turn this to `RuntimeException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375373421


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   ~This is still `RuntimeException`~ I was looking at outdated diff :-(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1374880822


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   if I don't do this, test fails with
   
   ```
   [error] Test org.apache.spark.network.shuffle.SslShuffleTransportContextSuite.testInitializePipeline failed: io.netty.channel.ChannelPipelineException: org.apache.spark.network.shuffle.ShuffleTransportContext$ShuffleMessageDecoder is not a @Sharable handler, so can't be added or removed multiple times., took 0.868s
   [error]     at io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600)
   [error]     at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202)
   [error]     at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:195)
   [error]     at org.apache.spark.network.TransportContext.initializePipeline(TransportContext.java:232)
   [error]     at org.apache.spark.network.shuffle.ShuffleTransportContext.initializePipeline(ShuffleTransportContext.java:94)
   [error]     at org.apache.spark.network.shuffle.ShuffleTransportContextSuite.testInitializePipeline(ShuffleTransportContextSuite.java:105)
   [error]     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   [error]     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   [error]     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   [error]     at java.lang.reflect.Method.invoke(Method.java:568)
   ```



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   @HyukjinKwon I mostly wanted to rethrow this with more context. Is there a better exception type I should use or should I just remove this catch?



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);
+        }
+        pipeline.addFirst("NettySslEncryptionHandler", sslHandler);
+        // Cannot use zero-copy with HTTPS, so we add in our ChunkedWriteHandler just before the
+        // MessageEncoder
+        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());

Review Comment:
   @mridulm I am not sure that's correct, given that we are adding a logging handler above.
   
   My understanding is that after line 228 the order of handlers will be:
   
   * NettySslEncryptionHandler
   * LoggingHandler
   * ChunkedWriteHandler
   
   Which means that we log the packets after encryption (for debugging) and then chunk them. 
   
   If we do `addLast` for the encryption handler, then it comes after logging (which we may want, but for now I found it helpful for debugging to keep as is). I
   
   If we do `addAfter` for the chunked write handler, then the logging would log chunked packets, which would make them a little harder to read for debugging.
   
   Am I missing something?



##########
common/network-common/src/test/java/org/apache/spark/network/SslChunkFetchIntegrationSuite.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Closeables;
+import org.junit.jupiter.api.BeforeAll;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.ssl.SslSampleConfigs;
+
+
+public class SslChunkFetchIntegrationSuite extends ChunkFetchIntegrationSuite {
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    int bufSize = 100000;
+    final ByteBuffer buf = ByteBuffer.allocate(bufSize);
+    for (int i = 0; i < bufSize; i ++) {
+      buf.put((byte) i);
+    }
+    buf.flip();
+    bufferChunk = new NioManagedBuffer(buf);
+
+    testFile = File.createTempFile("shuffle-test-file", "txt");
+    testFile.deleteOnExit();
+    RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
+    boolean shouldSuppressIOException = true;
+    try {
+      byte[] fileContent = new byte[1024];
+      new Random().nextBytes(fileContent);
+      fp.write(fileContent);
+      shouldSuppressIOException = false;
+    } finally {
+      Closeables.close(fp, shouldSuppressIOException);
+    }
+
+    final TransportConf conf = new TransportConf(
+      "shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
+    fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+
+    streamManager = new StreamManager() {
+      @Override
+      public ManagedBuffer getChunk(long streamId, int chunkIndex) {
+        assertEquals(STREAM_ID, streamId);
+        if (chunkIndex == BUFFER_CHUNK_INDEX) {
+          return new NioManagedBuffer(buf);
+        } else if (chunkIndex == FILE_CHUNK_INDEX) {
+          return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+        } else {
+          throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
+        }
+      }
+    };
+    RpcHandler handler = new RpcHandler() {
+      @Override
+      public void receive(
+          TransportClient client,
+          ByteBuffer message,
+          RpcResponseCallback callback) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public StreamManager getStreamManager() {
+        return streamManager;
+      }
+    };
+    context = new TransportContext(conf, handler);
+    server = context.createServer();
+    clientFactory = context.createClientFactory();
+  }

Review Comment:
   that's correct. I had run into issues since it's called in a `BeforeAll` block which didn't seem to work properly with inheritance - let me look at what you suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1784360145

   Merged to master.
   Thanks for working on this @hasnain-db !
   And thanks for the review @HyukjinKwon :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375374154


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -52,7 +53,7 @@
  * */
 public class ShuffleTransportContext extends TransportContext {
   private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
-  private static final ShuffleMessageDecoder SHUFFLE_DECODER =
+  private static ShuffleMessageDecoder SHUFFLE_DECODER =

Review Comment:
   Now that we dont need to mutate it, revert this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375189809


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   If this is unexpected, you can throw `IllegalStateException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375373743


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java:
##########
@@ -293,6 +296,28 @@ public void initChannel(SocketChannel ch) {
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
     }
+    if (context.sslEncryptionEnabled()) {
+      final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
+      Future<Channel> future = sslHandler.handshakeFuture().addListener(
+        new GenericFutureListener<Future<Channel>>() {
+          @Override
+          public void operationComplete(final Future<Channel> handshakeFuture) {
+            if (handshakeFuture.isSuccess()) {
+              logger.debug("{} successfully completed TLS handshake to ", address);
+            } else {
+              logger.info(
+                "failed to complete TLS handshake to " + address, handshakeFuture.cause());
+              cf.channel().close();
+            }
+          }
+      });
+      if (!future.await(conf.connectionTimeoutMs())) {
+        logger.info("failed to connect to " + address + " within connection timeout");

Review Comment:
   QQ: Will we have two log messages for the same failure ? Here and in `operationComplete` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1784443458

   @mridulm I see them all linked as child JIRAs on https://issues.apache.org/jira/browse/SPARK-44937 -- let's continue the conversation there if you do not see them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #43541: [SPARK-45544][CORE] Integrate SSL support into TransportContext
URL: https://github.com/apache/spark/pull/43541


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1781477807

   cc @mridulm - this should be the final functionality PR for the SSL support (the only other remaining PR is the docs one).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375466664


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java:
##########
@@ -293,6 +296,28 @@ public void initChannel(SocketChannel ch) {
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
     }
+    if (context.sslEncryptionEnabled()) {
+      final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
+      Future<Channel> future = sslHandler.handshakeFuture().addListener(
+        new GenericFutureListener<Future<Channel>>() {
+          @Override
+          public void operationComplete(final Future<Channel> handshakeFuture) {
+            if (handshakeFuture.isSuccess()) {
+              logger.debug("{} successfully completed TLS handshake to ", address);
+            } else {
+              logger.info(
+                "failed to complete TLS handshake to " + address, handshakeFuture.cause());
+              cf.channel().close();
+            }
+          }
+      });
+      if (!future.await(conf.connectionTimeoutMs())) {
+        logger.info("failed to connect to " + address + " within connection timeout");

Review Comment:
   I believe so - will remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375372996


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   Completely agree, I would prefer not modifying prod code for tests (even though we do have a fair bit of `Utils.isTesting` in our code :-( ).
   In this case, it is orthogonal though - since it actually makes sense as well - it was not done before due to oversight during my review (`MessageDecoder` is marked `Sharable`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375372996


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   Completely agree, I would prefer not modifying prod code for tests (even though we do have a fair bit of `Utils.isTesting` in our code :-( ).
   In this case, it is orthogonal though - since it actually makes sense as well - it was never done before due to oversight, not due to anything else I guess !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375373421


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   This is still `RuntimeException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1784679999

   Sigh, it does not show up if you are not logged in ... did not realize I had gotten logged out of jira.
   It does show up there, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1374076018


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));

Review Comment:
   nit:
   ```suggestion
               sslFactory.createSSLEngine(isClient, channel.alloc()));
   ```



##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java:
##########
@@ -293,6 +296,26 @@ public void initChannel(SocketChannel ch) {
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
     }
+    if (context.sslEncryptionEnabled()) {
+      final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
+      Future<Channel> future = sslHandler.handshakeFuture().addListener(
+        new GenericFutureListener<Future<Channel>>() {
+          @Override
+          public void operationComplete(final Future<Channel> handshakeFuture) {
+            if (handshakeFuture.isSuccess()) {
+              logger.debug("{} successfully completed TLS handshake to ", address);
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug(
+                  "failed to complete TLS handshake to " + address,
+                  handshakeFuture.cause());
+              }
+              cf.channel().close();
+            }
+          }
+      });
+      future.await(conf.connectionTimeoutMs());

Review Comment:
   Throw exception when await fails ? (after closing connection)



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);
+        }
+        pipeline.addFirst("NettySslEncryptionHandler", sslHandler);
+        // Cannot use zero-copy with HTTPS, so we add in our ChunkedWriteHandler just before the
+        // MessageEncoder
+        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());

Review Comment:
   `addFirst` and `addLast` for `sslHandler` should be the same at this point.
   But, if we want to do `addFirst`, then perhaps ensure `ChunkedWriteHandler` is added with `addAfter` `sslHandler` ?
   
   



##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java:
##########
@@ -293,6 +296,26 @@ public void initChannel(SocketChannel ch) {
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
     }
+    if (context.sslEncryptionEnabled()) {
+      final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
+      Future<Channel> future = sslHandler.handshakeFuture().addListener(
+        new GenericFutureListener<Future<Channel>>() {
+          @Override
+          public void operationComplete(final Future<Channel> handshakeFuture) {
+            if (handshakeFuture.isSuccess()) {
+              logger.debug("{} successfully completed TLS handshake to ", address);
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug(

Review Comment:
   Do we want to make this `info` instead ? I am assuming it wont be noisy, and when it does fail, it is something we want to know about ?



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java:
##########
@@ -86,6 +85,17 @@ public class ExternalShuffleIntegrationSuite {
     new byte[54321],
   };
 
+  private static TransportConf createTransportConf(String maxRetries, String rddEnabled) {

Review Comment:
   nit: specify using the actual types and convert it to `String` in this method.
   
   ```suggestion
     private static TransportConf createTransportConf(int maxRetries, boolean rddEnabled) {
   ```



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+

Review Comment:
   super nit:
   ```suggestion
   ```



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -223,6 +255,33 @@ protected MessageToMessageDecoder<ByteBuf> getDecoder() {
     return DECODER;
   }
 
+  private SSLFactory createSslFactory() {
+    if (conf.sslRpcEnabled()) {
+      if (conf.sslRpcEnabledAndKeysAreValid()) {
+        return new SSLFactory.Builder()
+          .openSslEnabled(conf.sslRpcOpenSslEnabled())
+          .requestedProtocol(conf.sslRpcProtocol())
+          .requestedCiphers(conf.sslRpcRequestedCiphers())
+          .keyStore(conf.sslRpcKeyStore(), conf.sslRpcKeyStorePassword())
+          .privateKey(conf.sslRpcPrivateKey())
+          .keyPassword(conf.sslRpcKeyPassword())
+          .certChain(conf.sslRpcCertChain())
+          .trustStore(
+            conf.sslRpcTrustStore(),
+            conf.sslRpcTrustStorePassword(),
+            conf.sslRpcTrustStoreReloadingEnabled(),
+            conf.sslRpctrustStoreReloadIntervalMs())
+          .build();
+      } else {
+        logger.error("RPC SSL encryption enabled but keys not found!" +
+          "Please ensure the configured keys are present.");
+        throw new RuntimeException("RPC SSL encryption enabled but keys not found!");

Review Comment:
   ```suggestion
           throw new IllegalArgumentException("RPC SSL encryption enabled but keys not found!");
   ```



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {

Review Comment:
   super nit: `isClient` -> `client`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java:
##########
@@ -52,7 +53,8 @@
  * */
 public class ShuffleTransportContext extends TransportContext {
   private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
-  private static final ShuffleMessageDecoder SHUFFLE_DECODER =
+  @VisibleForTesting
+  protected static ShuffleMessageDecoder SHUFFLE_DECODER =
       new ShuffleMessageDecoder(MessageDecoder.INSTANCE);

Review Comment:
   Instead of exposing the variable, add a method to reinitialize it - and annotate as for use by tests.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java:
##########
@@ -86,6 +85,17 @@ public class ExternalShuffleIntegrationSuite {
     new byte[54321],
   };
 
+  private static TransportConf createTransportConf(String maxRetries, String rddEnabled) {
+    HashMap<String, String> config = new HashMap<>();
+    config.put("spark.shuffle.io.maxRetries", maxRetries);
+    config.put(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, rddEnabled);
+    return new TransportConf("shuffle", new MapConfigProvider(config));
+  }
+
+  protected TransportConf createTransportConfForFetchNoServerTest() {

Review Comment:
   It is unclear to me why this method is named this way ...



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to avoid errors since it's
+        // used both across ShuffleTransportContextSuite and SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   The decoder is not being used here (other than configuring the pipeline) - why do we need to reset it ?



##########
common/network-common/src/test/java/org/apache/spark/network/SslChunkFetchIntegrationSuite.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Closeables;
+import org.junit.jupiter.api.BeforeAll;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.ssl.SslSampleConfigs;
+
+
+public class SslChunkFetchIntegrationSuite extends ChunkFetchIntegrationSuite {
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    int bufSize = 100000;
+    final ByteBuffer buf = ByteBuffer.allocate(bufSize);
+    for (int i = 0; i < bufSize; i ++) {
+      buf.put((byte) i);
+    }
+    buf.flip();
+    bufferChunk = new NioManagedBuffer(buf);
+
+    testFile = File.createTempFile("shuffle-test-file", "txt");
+    testFile.deleteOnExit();
+    RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
+    boolean shouldSuppressIOException = true;
+    try {
+      byte[] fileContent = new byte[1024];
+      new Random().nextBytes(fileContent);
+      fp.write(fileContent);
+      shouldSuppressIOException = false;
+    } finally {
+      Closeables.close(fp, shouldSuppressIOException);
+    }
+
+    final TransportConf conf = new TransportConf(
+      "shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
+    fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+
+    streamManager = new StreamManager() {
+      @Override
+      public ManagedBuffer getChunk(long streamId, int chunkIndex) {
+        assertEquals(STREAM_ID, streamId);
+        if (chunkIndex == BUFFER_CHUNK_INDEX) {
+          return new NioManagedBuffer(buf);
+        } else if (chunkIndex == FILE_CHUNK_INDEX) {
+          return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+        } else {
+          throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
+        }
+      }
+    };
+    RpcHandler handler = new RpcHandler() {
+      @Override
+      public void receive(
+          TransportClient client,
+          ByteBuffer message,
+          RpcResponseCallback callback) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public StreamManager getStreamManager() {
+        return streamManager;
+      }
+    };
+    context = new TransportContext(conf, handler);
+    server = context.createServer();
+    clientFactory = context.createClientFactory();
+  }

Review Comment:
   If I am not wrong, the only change between this and `ChunkFetchIntegrationSuite.setUp` is `conf` right ?
   If yes, instead of duplicating the method - pass the `conf` to a common static method to initialize for both Suites instead instead ?
   
   (Same comment for the other Suites too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1375190010


##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);
+        }
+        pipeline.addFirst("NettySslEncryptionHandler", sslHandler);
+        // Cannot use zero-copy with HTTPS, so we add in our ChunkedWriteHandler just before the
+        // MessageEncoder
+        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());

Review Comment:
   You are right, the order as coded makes sense.
   Thanks for clarifying !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1784357827

   The test `WorkerDecommissionSuite.'verify a running task with all workers decommissioned succeeds'`, but this is unrelated to current pr.
   Will merge to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45544][CORE] Integrate SSL support into TransportContext [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43541:
URL: https://github.com/apache/spark/pull/43541#issuecomment-1784360820

   @hasnain-db , can you link all the individual jira's to the original jira please ?
   That will help link and identify all the relevant PR's for this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org