You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hasnain-db (via GitHub)" <gi...@apache.org> on 2023/10/06 05:31:58 UTC

[PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

hasnain-db opened a new pull request, #43244:
URL: https://github.com/apache/spark/pull/43244

   ### What changes were proposed in this pull request?
   
   This PR adds helper classes for SSL RPC communication that are needed to work around the fact that `netty` does not support zero-copy transfers.
   
   These mirror the existing `MessageWithHeader` and `MessageEncoder` classes with very minor differences. But the differences were just enough that it didn't seem easy to refactor/consolidate, and since we don't expect these classes to change much I hope it's ok.
   
   ### Why are the changes needed?
   
   These are needed to support transferring `ManagedBuffer`s into a form that can be transferred by `netty` over the network, since netty's encryption support does not support zero-copy transfers.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Added unit tests
   
   ```
   build/sbt
   > project network-common
   > testOnly org.apache.spark.network.protocol.EncryptedMessageWithHeaderSuite
   ``` 
   
   The rest of the changes and integration were tested as part of https://github.com/apache/spark/pull/42685
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43244:
URL: https://github.com/apache/spark/pull/43244#issuecomment-1762546614

   `org.apache.spark.sql.kafka010.KafkaSourceStressSuite` is failing on this and the other outstanding PR, and seems to be unrelated to the changes here. I wonder if that is related to the new kafka version bump since it looks like it also failed on https://github.com/apache/spark/actions/runs/6497942586/job/17648192318


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43244:
URL: https://github.com/apache/spark/pull/43244#issuecomment-1763269430

   @mridulm I believe the only failure now is `org.apache.spark.sql.kafka010.KafkaSourceStressSuite`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1349612404


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   From a code evolution pov, I am unsure whether we need to do this ?
   ```suggestion
         int available = Math.min(stream.available(), length() - totalBytesTransferred);
   ```
   
   Currrently, we dont require this from what I see (pls correct me if I am missing something !), but things could diverge based on how `convertToNetty` and `convertToNettyForSsl` evolve.
   
   Similar thing applies to `stream.readChunk` as well.
   
   Thoughts ?
   



##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();
+      ByteBuf buffer = allocator.buffer(available);
+      int toRead = Math.min(available, buffer.writableBytes());

Review Comment:
   `available` need not return a reasonable value, and can be `0`.
   Take a look at `io.netty.handler.stream.ChunkedStream` on how to handle this (one option could also be to wrap body in `LimitedInputStream` and delegate to it)
   



##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();
+      ByteBuf buffer = allocator.buffer(available);
+      int toRead = Math.min(available, buffer.writableBytes());
+      int read = buffer.writeBytes(stream, toRead);
+      totalBytesTransferred += read;

Review Comment:
   nit: add only if read >= 0, and throw `EOFException` when `-1` is returned



##########
common/network-common/src/test/java/org/apache/spark/network/protocol/EncryptedMessageWithHeaderSuite.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Random;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.stream.ChunkedStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+public class EncryptedMessageWithHeaderSuite {

Review Comment:
   Add a simple test for the third case, where body is a `ByteBuf` - to catch potential issues as and when `SslMessageEncoder` evolves



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1358700597


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,148 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.EOFException;
+import java.io.InputStream;

Review Comment:
   nit: Did not notice this earlier - reorder imports, and make sure `javax` comes after `java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357172193


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   You are right, our current implementations ensure that the returned `ByteBuf` will always be within bounds.
   Add a precondition check to ensure `totalBytesTransferred <= length()` there ? Rest should be fine.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1349612031


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();
+      ByteBuf buffer = allocator.buffer(available);
+      int toRead = Math.min(available, buffer.writableBytes());

Review Comment:
   `available` need not return a reasonable value, and can be `0`.
   Take a look at `io.netty.handler.stream.ChunkedStream` on how to handle this (one option could also be to wrap body in `LimitedInputStream` and delegate to it, but given the stream might already be wrapped in `LimitedInputStream`, I am not super keen on it - but ymmv)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1353732875


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   A general `stream.readChunk` can return more data than expected - similar issue as `available` here (where more data that expected is read).
   
   (I am away from desktop - so have not checked if this is an assumption we can make - that readChunk will not return more than expected in our context)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43244:
URL: https://github.com/apache/spark/pull/43244#issuecomment-1763282672

   There was a failure in `KafkaSourceStressSuite`, which is unrelated to this PR.
   Merged to master.
   
   Thanks for working on this @hasnain-db !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357744469


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   To clarify, we dont need this for the InputStream path - given the way we are already computing how much needs to be read - only for `readBuf` path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1353732875


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   A general `stream.readChunk` can return more data than expected - similar issue as `available` here (where more data that expected is read).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1349612404


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   From a code evolution pov, I am unsure whether we need to do this ?
   ```suggestion
         int available = Math.min(stream.available(), length() - totalBytesTransferred);
   ```
   
   Currrently, we dont require this modification from what I see (pls correct me if I am missing something !), but things could diverge based on how `convertToNetty` and `convertToNettyForSsl` evolve.
   
   Similar thing applies to `stream.readChunk` as well.
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1353032599


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   updated, but I'm not sure how this applies to `stream.readChunk`? That does seem to do its own checks so we don't need to change the branch here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43244:
URL: https://github.com/apache/spark/pull/43244#issuecomment-1762701527

   Can you retrigger the tests please ? There are a bunch of failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357450204


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   @mridulm could you clarify what check is needed where? if I understand correctly, the `isEndOfInput()` check at the start of this method would cover what you're asking, so I'm probably missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1353972708


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   @mridulm I think I'm confused a little (probably missing something).
   
   Inside our implementation of `readBuf`, we need to return a `ByteBuf` to the caller. When we're backed by an `InputStream`, we have to allocate a `ByteBuf` ourselves, sizing it appropriately, and asking the `InputStream` to write bytes into it. We thus have to properly account for how much is available, how much we ask for, and how much is read. I've made the changes as you suggested (correctly, I hope) - as this makes sense.
   
   However, when we're backed by a `ChunkStream`, the only API it gives us is a `readChunk` - that returns a `ByteBuf` directly and so we just need to count how many new bytes were transferred and return the buffer directly. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357172193


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   You are right, our current implementations ensure that the returned `ByteBuf` will always be within bounds.
   Add a precondition check to ensure `totalBytesTransferred <= length()` there (after updating `totalBytesTransferred`) ? Rest should be fine.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357744469


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   To clarify, we dont need this for the InputStream path - given the way we are already computing how much needs to be read - only for `readChunk` path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1349632160


##########
common/network-common/src/test/java/org/apache/spark/network/protocol/EncryptedMessageWithHeaderSuite.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Random;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.stream.ChunkedStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+public class EncryptedMessageWithHeaderSuite {

Review Comment:
   acknowledged, will add a test which validates we throw when passed a `ByteBuf`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357656123


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   Something like `assert(totalBytesTransferred <= length())` after updating `totalBytesTransferred`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357656123


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   `assert(totalBytesTransferred <= length())` after updating `totalBytesTransferred`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43244:
URL: https://github.com/apache/spark/pull/43244#discussion_r1357747747


##########
common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java:
##########
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import javax.annotation.Nullable;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedInput;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
+ */
+public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {
+
+  @Nullable private final ManagedBuffer managedBuffer;
+  private final ByteBuf header;
+  private final int headerLength;
+  private final Object body;
+  private final long bodyLength;
+  private long totalBytesTransferred;
+
+  /**
+   * Construct a new EncryptedMessageWithHeader.
+   *
+   * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
+   *                      be passed in so that the buffer can be freed when this message is
+   *                      deallocated. Ownership of the caller's reference to this buffer is
+   *                      transferred to this class, so if the caller wants to continue to use the
+   *                      ManagedBuffer in other messages then they will need to call retain() on
+   *                      it before passing it to this constructor.
+   * @param header the message header.
+   * @param body the message body.
+   * @param bodyLength the length of the message body, in bytes.
+   */
+
+  public EncryptedMessageWithHeader(
+      @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
+    Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
+      "Body must be an InputStream or a ChunkedStream.");
+    this.managedBuffer = managedBuffer;
+    this.header = header;
+    this.headerLength = header.readableBytes();
+    this.body = body;
+    this.bodyLength = bodyLength;
+    this.totalBytesTransferred = 0;
+  }
+
+  @Override
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+    return readChunk(ctx.alloc());
+  }
+
+  @Override
+  public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
+    if (isEndOfInput()) {
+      return null;
+    }
+
+    if (totalBytesTransferred < headerLength) {
+      totalBytesTransferred += headerLength;
+      return header.retain();
+    } else if (body instanceof InputStream) {
+      InputStream stream = (InputStream) body;
+      int available = stream.available();

Review Comment:
   thanks! updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43244:
URL: https://github.com/apache/spark/pull/43244#issuecomment-1763115565

   done, will re-request review once failures are minimized


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45429][CORE] Add helper classes for SSL RPC communication [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #43244: [SPARK-45429][CORE] Add helper classes for SSL RPC communication
URL: https://github.com/apache/spark/pull/43244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org