You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2015/04/07 01:05:22 UTC

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/5377

    [SPARK-6229] Add SASL encryption to network library.

    There are two main parts of this change:
    
    - Extending the bootstrap mechanism in the network library to add a server-side
      bootstrap (which works a little bit differently than the client-side bootstrap), and
      to allow the  bootstraps to modify the underlying channel.
    
    - Use SASL to encrypt data going through the RPC channel.
    
    The second item requires some non-optimal code to be able to work around the
    fact that the outbound path in netty is not thread-safe, and ordering is very important
    when encryption is in the picture.
    
    A lot of the changes outside the network/common library are just to adjust to the
    changed API for initializing the RPC server.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-6229

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5377
    
----
commit fbe6ccb4bbf5cd8dd23b5bcbe5d013d7efb40f88
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-03-24T21:48:01Z

    Add TransportServerBootstrap, make SASL code use it.
    
    This commit changes the way a transport server is configured to use SASL. It
    tried to be more in line with how you set up a client, by registering a
    "bootstrap" that runs before the server is actually available. Due to how
    servers internally work, the server bootstrap works differently from the client
    one, but a similar external interface is provided to clients of the library.
    
    The SASL backend was reorganized to use this new mechanism. Later, the
    bootstrap will be modified so that it's possible to also use SASL for
    encryption, which requires inserting a new channel handler into the pipeline
    - the main reason why this change was necessary in the first place.

commit 351a86f882ca082da963d645e5a66065fe7c21f2
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-03-25T00:39:26Z

    Add SASL encryption to network library.
    
    This change extends the "bootstrap" classes to expose the underlying netty
    channel to the bootstrap implementations. This allows the SASL bootstraps
    to register channel handlers that can then perform data encryption, when
    that is enabled by the user.
    
    Encryption is performed in a manner that is transparent to the other existing
    channel handlers, so perhaps it's not the most efficient implementation,
    especially since it requires having 2 frame decoders in the pipeline. But it's
    the least intrusive way to do it.

commit 39539a7d4f889102ec9e12c61a7e2984011af594
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-03-26T20:18:04Z

    Add config option to enable SASL encryption.

commit b923cae7b9e1e5616b5be0759ec810ee9d9fe5a4
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-03-27T18:50:31Z

    Make SASL encryption handler thread-safe, handle FileRegion messages.

commit b00999a877c0a8562db7f7bcd2d8d79be6196924
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-04-03T20:25:10Z

    Consolidate ByteArrayWritableChannel, fix SASL code to match master changes.

commit dad42fcff225fded1763ea2b7602b1fe6743b157
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-04-03T22:08:33Z

    Make encryption thread-safe, less memory-intensive.
    
    Two changes here:
    
    First, wrap the code that does encryption in a FileRegion. Without resorting
    to using a ThreadPerChannelEventLoopGroup, that's the only thread-safe way to
    do encryption in netty, apparently, since message ordering is extremely
    important here. This requires some funny accounting in the SASL FileRegion
    implementation, because we can't know before hand how much data we'll have
    to write.
    
    Second, instead of encrypting the whole original message, do it in smaller
    chunks, to avoid using too much memory on the sender side. So it should be
    possible to transfer a large file block without having to load the whole
    block into memory at once.

commit e98bc5552591c5e33ae4e8b050128bd9b73f7440
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-04-06T22:50:56Z

    Add option to only allow encrypted connections to the server.

commit 85843230d947f85939363388f5ceaa6fbc9e4896
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-04-06T23:00:04Z

    Fix a comment.

commit cf2a605d02afd4f462270a0d808ff82c75ec04fd
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-04-06T23:02:40Z

    Clean up some code.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-94072288
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30497/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-92436434
  
    I'm definitely not an expert on this, but the code seemed OK to me.  My only concern -- is it possible to add a test that ensures data is actually getting encrypting?  Otherwise the unit tests could pass just by making `encrypt = true` a no-op.  I can't think of any way to do that, though -- maybe its not possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121527
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java ---
    @@ -60,13 +60,19 @@
       static final String DIGEST = "DIGEST-MD5";
     
       /**
    -   * The quality of protection is just "auth". This means that we are doing
    -   * authentication only, we are not supporting integrity or privacy protection of the
    -   * communication channel after authentication. This could be changed to be configurable
    -   * in the future.
    +   * QOP value that includes encryption.
    +   */
    +  static final String QOP_AUTH_CONF = "auth-conf";
    +
    +  /**
    +   * QOP value that does not include encryption.
    +   */
    +  static final String QOP_AUTH = "auth";
    +
    +  /**
    +   * Common SASL config properties for both client and server.
        */
       static final Map<String, String> SASL_PROPS = ImmutableMap.<String, String>builder()
    -    .put(Sasl.QOP, "auth")
         .put(Sasl.SERVER_AUTH, "true")
    --- End diff --
    
    Is this property relevant for the client? Potentially we could just do away with this static map if not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121544
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java ---
    @@ -60,13 +60,19 @@
       static final String DIGEST = "DIGEST-MD5";
     
       /**
    -   * The quality of protection is just "auth". This means that we are doing
    -   * authentication only, we are not supporting integrity or privacy protection of the
    -   * communication channel after authentication. This could be changed to be configurable
    -   * in the future.
    +   * QOP value that includes encryption.
    --- End diff --
    
    I liked the spelled-out "quality of protection" better :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28328017
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java ---
    @@ -62,13 +76,26 @@ public void doBootstrap(TransportClient client) {
             byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
             payload = saslClient.response(response);
           }
    +
    +      if (encrypt) {
    +        if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslClient.getNegotiatedProperty(Sasl.QOP))) {
    +          throw new RuntimeException(
    +            new SaslException("Encryption requests by negotiated non-encrypted connection."));
    +        }
    +        SaslEncryption.addToChannel(channel, saslClient, conf.maxSaslEncryptedBlockSize());
    +        saslClient = null;
    +        logger.debug("Channel {} configured for SASL encryption.", client);
    +      }
         } finally {
    -      try {
    -        // Once authentication is complete, the server will trust all remaining communication.
    -        saslClient.dispose();
    -      } catch (RuntimeException e) {
    -        logger.error("Error while disposing SASL client", e);
    +      if (saslClient != null) {
    --- End diff --
    
    oh, sorry, I should have realized that.  thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93109516
  
    Will take a look tomorrow, thanks for the ping!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28253379
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java ---
    @@ -62,13 +76,26 @@ public void doBootstrap(TransportClient client) {
             byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
             payload = saslClient.response(response);
           }
    +
    +      if (encrypt) {
    +        if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslClient.getNegotiatedProperty(Sasl.QOP))) {
    +          throw new RuntimeException(
    +            new SaslException("Encryption requests by negotiated non-encrypted connection."));
    +        }
    +        SaslEncryption.addToChannel(channel, saslClient, conf.maxSaslEncryptedBlockSize());
    +        saslClient = null;
    +        logger.debug("Channel {} configured for SASL encryption.", client);
    +      }
         } finally {
    -      try {
    -        // Once authentication is complete, the server will trust all remaining communication.
    -        saslClient.dispose();
    -      } catch (RuntimeException e) {
    -        logger.error("Error while disposing SASL client", e);
    +      if (saslClient != null) {
    --- End diff --
    
    rather than setting `saslClient = null` in the `encrypt` block, and then checking for null here, why not just use `if (!encrypt)` here?  If there is not another reason to `saslClient = null`, I think that would be clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97553028
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31299/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121094
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    --- End diff --
    
    should we advance msg by the readableBytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96842198
  
      [Test build #31073 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31073/consoleFull) for   PR 5377 at commit [`73cff0e`](https://github.com/apache/spark/commit/73cff0ecb6629f2308c11eff2644423bd528acda).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93847257
  
      [Test build #30434 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30434/consoleFull) for   PR 5377 at commit [`9908ada`](https://github.com/apache/spark/commit/9908ada3e2e07f4e725c76174d7b7d2e0f6a7a23).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28282594
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java ---
    @@ -62,13 +76,26 @@ public void doBootstrap(TransportClient client) {
             byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
             payload = saslClient.response(response);
           }
    +
    +      if (encrypt) {
    +        if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslClient.getNegotiatedProperty(Sasl.QOP))) {
    +          throw new RuntimeException(
    +            new SaslException("Encryption requests by negotiated non-encrypted connection."));
    +        }
    +        SaslEncryption.addToChannel(channel, saslClient, conf.maxSaslEncryptedBlockSize());
    +        saslClient = null;
    +        logger.debug("Channel {} configured for SASL encryption.", client);
    +      }
         } finally {
    -      try {
    -        // Once authentication is complete, the server will trust all remaining communication.
    -        saslClient.dispose();
    -      } catch (RuntimeException e) {
    -        logger.error("Error while disposing SASL client", e);
    +      if (saslClient != null) {
    --- End diff --
    
    Checking for `encrypt` wouldn't work because we still want to dispose of the client instance when some error is thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96857355
  
      [Test build #31089 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31089/consoleFull) for   PR 5377 at commit [`7a2a805`](https://github.com/apache/spark/commit/7a2a805b35f87040e8c5ea9b82d50ef6cf965d95).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121561
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java ---
    @@ -75,11 +81,20 @@
       private final SecretKeyHolder secretKeyHolder;
       private SaslServer saslServer;
     
    -  public SparkSaslServer(String secretKeyId, SecretKeyHolder secretKeyHolder) {
    +  public SparkSaslServer(
    +      String secretKeyId,
    +      SecretKeyHolder secretKeyHolder,
    +      boolean alwaysEncrypt) {
         this.secretKeyId = secretKeyId;
         this.secretKeyHolder = secretKeyHolder;
    +
    +    String qop = alwaysEncrypt ? QOP_AUTH_CONF : String.format("%s,%s", QOP_AUTH_CONF, QOP_AUTH);
    --- End diff --
    
    I assume this is a comma-separated list of the supported formats, for negotiation? Maybe add a comment to this effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-94072277
  
      [Test build #30497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30497/consoleFull) for   PR 5377 at commit [`4797519`](https://github.com/apache/spark/commit/4797519b16c237c2573b235b49b656650eb9d0ae).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-90297034
  
      [Test build #29759 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29759/consoleFull) for   PR 5377 at commit [`cf2a605`](https://github.com/apache/spark/commit/cf2a605d02afd4f462270a0d808ff82c75ec04fd).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97237487
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31165/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121182
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final ByteArrayWritableChannel byteChannel;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    /**
    +     * Returns an approximation of the amount of data transferred. See {@link #count()}.
    +     */
    +    @Override
    +    public long transfered() {
    +      return transferred;
    +    }
    +
    +    /**
    +     * Transfers data from the original message to the channel, encrypting it in the process.
    +     *
    +     * This method also breaks down the original message into smaller chunks when needed. This
    +     * is done to keep memory usage under control. This avoids having to copy the whole message
    +     * data into memory at once, and can avoid ballooning memory usage when transferring large
    +     * messages such as shuffle blocks.
    +     *
    +     * The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
    +     * until a whole chunk has been written. This is done because the code can't use the actual
    +     * number of bytes written to the channel as the transferred count (see {@link #count()}).
    +     * Instead, once an encrypted chunk is written to the output (including its header), the
    +     * size of the original block will be added to the {@link #transfered()} amount.
    +     */
    +    @Override
    +    public long transferTo(final WritableByteChannel target, final long position)
    +      throws IOException {
    +
    +      Preconditions.checkArgument(position == transfered(), "Invalid position.");
    +
    +      long written = 0;
    +      do {
    +        if (currentChunk == null) {
    +          nextChunk();
    +        }
    +
    +        if (currentHeader.readableBytes() > 0) {
    +          int bytesWritten = target.write(currentHeader.nioBuffer());
    +          currentHeader.skipBytes(bytesWritten);
    +          if (currentHeader.readableBytes() > 0) {
    +            // Break out of loop if there are still header bytes left to write.
    +            break;
    +          }
    +        }
    +
    +        target.write(currentChunk);
    +        if (!currentChunk.hasRemaining()) {
    +          // Only update the count of written bytes once a full chunk has been written.
    +          // See method javadoc.
    +          written += unencryptedChunkSize;
    +          currentHeader.release();
    +          currentHeader = null;
    +          currentChunk = null;
    +          currentChunkSize = 0;
    +        }
    +      } while (currentChunk == null && transfered() + written < count());
    +
    +      transferred += written;
    +      return written;
    +    }
    +
    +    private void nextChunk() throws IOException {
    +      byteChannel.reset();
    +      if (isByteBuf) {
    +        int copied = byteChannel.write(buf.nioBuffer());
    --- End diff --
    
    Could this be a ByteArrayOutputStream? If it were, I think you could use buf.readBytes(byteStream) or something of that sort.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97196878
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28490148
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportConf.java ---
    @@ -105,4 +105,19 @@ public boolean lazyFileDescriptor() {
       public int portMaxRetries() {
         return conf.getInt("spark.port.maxRetries", 16);
       }
    +
    +  /**
    +   * Maximum number of bytes to be encrypted at a time when SASL encryption is enabled.
    +   */
    +  public int maxSaslEncryptedBlockSize() {
    +    return conf.getInt("spark.network.sasl.max_encrypted_block_size_kb", 64) * 1024;
    --- End diff --
    
    nit: spark config style is camelCase for things which are not separated by periods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121679
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    --- End diff --
    
    Please add a class comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93793360
  
      [Test build #30426 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30426/consoleFull) for   PR 5377 at commit [`7fe1489`](https://github.com/apache/spark/commit/7fe148953b09e8f7c3fad66774ce6c8aa1850a45).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-141255693
  
    How do we enable this feature? Would be good to add some docs for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-90276086
  
    I tested this with netty/encryption, netty/no encryption, and nio. Didn't really run performance tests (don't have a cluster available where I can really do that), but I don't expect things with encryption enabled to exactly fly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97196785
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29204220
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.NettyUtils;
    +
    +/**
    + * Provides SASL-based encription for transport channels. The single method exposed by this
    + * class installs the needed channel handlers on a connected channel.
    + */
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +        msg.skipBytes(length);
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  @VisibleForTesting
    +  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final int maxOutboundBlockSize;
    +    private final ExposedByteArrayOutputStream byteStream;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long currentReportedBytes;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +      this.byteStream = new ExposedByteArrayOutputStream(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    --- End diff --
    
    won't this count() change as we read bytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97516573
  
      [Test build #31299 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31299/consoleFull) for   PR 5377 at commit [`ff01966`](https://github.com/apache/spark/commit/ff019662055edeaf8d709087e0279afa37011353).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28489592
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---
    @@ -49,18 +49,21 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
       private[this] var appId: String = _
     
       override def init(blockDataManager: BlockDataManager): Unit = {
    -    val (rpcHandler: RpcHandler, bootstrap: Option[TransportClientBootstrap]) = {
    -      val nettyRpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
    -      if (!authEnabled) {
    -        (nettyRpcHandler, None)
    +    val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
    +    val (serverBootstrap: Option[TransportServerBootstrap],
    --- End diff --
    
    nit: This syntax gets unwieldy with more code in here, I'd advise switching to something like
    ```scala
    var serverBootstrap: Option[TransportServerBootstrap] = None
    var clientBootstrap: Option[TransportClientBootstrap] = None
    if (authEnabled) {
      serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager)
      clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
        securityManager.isSaslEncryptionEnabled()))
    }
    ```
    
    uses the dreaded `var` but I think it may be less syntactically cumbersome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29195262
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    --- End diff --
    
    I see, it's just slightly odd that only one of the two cases moves msg's reader index. In the current implementation, you're right that it looks correct, but a seemingly unrelated change could cause issues down the line due to the discrepancy, and only in a very weird circumstance (off-heap vs on-heap messages). Consider using `msg.getBytes(msg.readerIndx(), data)` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29205150
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.NettyUtils;
    +
    +/**
    + * Provides SASL-based encription for transport channels. The single method exposed by this
    + * class installs the needed channel handlers on a connected channel.
    + */
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +        msg.skipBytes(length);
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  @VisibleForTesting
    +  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final int maxOutboundBlockSize;
    +    private final ExposedByteArrayOutputStream byteStream;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long currentReportedBytes;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +      this.byteStream = new ExposedByteArrayOutputStream(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    --- End diff --
    
    It's unclear from the docs, but in the existing implementations (e.g. LazyFileRegion), `count()` always returns the same value. Also, netty does [this](https://github.com/netty/netty/blob/33f75d374091946058fc334c3cdbcd0f0a59d9b3/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L189):
    
        boolean done = region.transfered() >= region.count();
    
    Which implies that `count()` should not change as bytes are transferred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96875510
  
      [Test build #31089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31089/consoleFull) for   PR 5377 at commit [`7a2a805`](https://github.com/apache/spark/commit/7a2a805b35f87040e8c5ea9b82d50ef6cf965d95).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch **adds the following new dependencies:**
       * `commons-math3-3.4.1.jar`
       * `snappy-java-1.1.1.7.jar`
       * `tachyon-0.6.4.jar`
       * `tachyon-client-0.6.4.jar`
    
     * This patch **removes the following dependencies:**
       * `commons-math3-3.1.1.jar`
       * `snappy-java-1.1.1.6.jar`
       * `tachyon-0.5.0.jar`
       * `tachyon-client-0.5.0.jar`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121814
  
    --- Diff: network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java ---
    @@ -86,4 +117,237 @@ public void testNonMatching() {
           assertFalse(server.isComplete());
         }
       }
    +
    +  @Test
    +  public void testSaslAuthentication() throws Exception {
    +    testBasicSasl(false);
    +  }
    +
    +  @Test
    +  public void testSaslEncryption() throws Exception {
    --- End diff --
    
    I think these methods should be right next to testBasicSasl's definition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97553027
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-90297050
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29759/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96861787
  
      [Test build #31075 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31075/consoleFull) for   PR 5377 at commit [`3d1695d`](https://github.com/apache/spark/commit/3d1695dbcf254b181886f67162882ee0ce88b6c7).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29188459
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    --- End diff --
    
    It's unnecessary since `MessageToMessageDecoder` will release the input message when this method returns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97553004
  
      [Test build #31299 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31299/consoleFull) for   PR 5377 at commit [`ff01966`](https://github.com/apache/spark/commit/ff019662055edeaf8d709087e0279afa37011353).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28489665
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
    @@ -55,14 +56,14 @@
       private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
     
       private final TransportConf conf;
    -  private final RpcHandler rpcHandler;
    +  private final RpcHandler appRpcHandler;
    --- End diff --
    
    Why was this guy renamed? What is the app?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93830348
  
      [Test build #30434 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30434/consoleFull) for   PR 5377 at commit [`9908ada`](https://github.com/apache/spark/commit/9908ada3e2e07f4e725c76174d7b7d2e0f6a7a23).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-94054802
  
      [Test build #30497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30497/consoleFull) for   PR 5377 at commit [`4797519`](https://github.com/apache/spark/commit/4797519b16c237c2573b235b49b656650eb9d0ae).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93830340
  
    A note about the last commit: I noticed while debugging that calling `TransportServer.close()` doesn't close client connections that are still open. That may or may not be a bug in practice, but it could lead to malicious clients hogging resources in case the server process stays alive after the server socket is closed.
    
    That is something for a different change, though. One way to fix it could be to have a timeout on the SASL negotiation: if the negotiation is successful we trust the client, otherwise the timeout closes the door on malicious clients.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97197545
  
      [Test build #31165 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31165/consoleFull) for   PR 5377 at commit [`47d4aff`](https://github.com/apache/spark/commit/47d4aff51538e09252b0f6286dd0e49052b07c18).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29190765
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java ---
    @@ -60,13 +60,19 @@
       static final String DIGEST = "DIGEST-MD5";
     
       /**
    -   * The quality of protection is just "auth". This means that we are doing
    -   * authentication only, we are not supporting integrity or privacy protection of the
    -   * communication channel after authentication. This could be changed to be configurable
    -   * in the future.
    +   * QOP value that includes encryption.
    +   */
    +  static final String QOP_AUTH_CONF = "auth-conf";
    +
    +  /**
    +   * QOP value that does not include encryption.
    +   */
    +  static final String QOP_AUTH = "auth";
    +
    +  /**
    +   * Common SASL config properties for both client and server.
        */
       static final Map<String, String> SASL_PROPS = ImmutableMap.<String, String>builder()
    -    .put(Sasl.QOP, "auth")
         .put(Sasl.SERVER_AUTH, "true")
    --- End diff --
    
    I don't think it applies to the client. I'm also not sure whether it's needed at all, but I'll change the code so it's only set for the server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96855426
  
      [Test build #31073 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31073/consoleFull) for   PR 5377 at commit [`73cff0e`](https://github.com/apache/spark/commit/73cff0ecb6629f2308c11eff2644423bd528acda).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29189853
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final ByteArrayWritableChannel byteChannel;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    /**
    +     * Returns an approximation of the amount of data transferred. See {@link #count()}.
    +     */
    +    @Override
    +    public long transfered() {
    +      return transferred;
    +    }
    +
    +    /**
    +     * Transfers data from the original message to the channel, encrypting it in the process.
    +     *
    +     * This method also breaks down the original message into smaller chunks when needed. This
    +     * is done to keep memory usage under control. This avoids having to copy the whole message
    +     * data into memory at once, and can avoid ballooning memory usage when transferring large
    +     * messages such as shuffle blocks.
    +     *
    +     * The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
    +     * until a whole chunk has been written. This is done because the code can't use the actual
    +     * number of bytes written to the channel as the transferred count (see {@link #count()}).
    +     * Instead, once an encrypted chunk is written to the output (including its header), the
    +     * size of the original block will be added to the {@link #transfered()} amount.
    +     */
    +    @Override
    +    public long transferTo(final WritableByteChannel target, final long position)
    +      throws IOException {
    +
    +      Preconditions.checkArgument(position == transfered(), "Invalid position.");
    +
    +      long written = 0;
    +      do {
    +        if (currentChunk == null) {
    +          nextChunk();
    +        }
    +
    +        if (currentHeader.readableBytes() > 0) {
    +          int bytesWritten = target.write(currentHeader.nioBuffer());
    +          currentHeader.skipBytes(bytesWritten);
    +          if (currentHeader.readableBytes() > 0) {
    +            // Break out of loop if there are still header bytes left to write.
    +            break;
    +          }
    +        }
    +
    +        target.write(currentChunk);
    +        if (!currentChunk.hasRemaining()) {
    +          // Only update the count of written bytes once a full chunk has been written.
    +          // See method javadoc.
    +          written += unencryptedChunkSize;
    +          currentHeader.release();
    +          currentHeader = null;
    +          currentChunk = null;
    +          currentChunkSize = 0;
    +        }
    +      } while (currentChunk == null && transfered() + written < count());
    +
    +      transferred += written;
    +      return written;
    +    }
    +
    +    private void nextChunk() throws IOException {
    +      byteChannel.reset();
    +      if (isByteBuf) {
    +        int copied = byteChannel.write(buf.nioBuffer());
    --- End diff --
    
    Hmm... I could, but then I'd need different logic for the `FileRegion` case (since it needs a `WritableByteChannel`).
    
    It's not a lot of code, but is more than just leaving the `skipBytes` call there.
    
    (I could also implement `GatheringByteChannel` instead of `WritableByteChannel`, but, again, that would be more code than just the one skip call...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96850125
  
      [Test build #31075 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31075/consoleFull) for   PR 5377 at commit [`3d1695d`](https://github.com/apache/spark/commit/3d1695dbcf254b181886f67162882ee0ce88b6c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93816186
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30426/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121766
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
    @@ -99,9 +108,9 @@ public TransportServer createServer() {
        * be used to communicate on this channel. The TransportClient is directly associated with a
        * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
        */
    -  public TransportChannelHandler initializePipeline(SocketChannel channel) {
    +  public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler rpcHandler) {
    --- End diff --
    
    The rpcHandler vs appRpcHandler thing is definitely confusing to someone reading this for the first time, please add a comment here or in TransportContext about the difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121332
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final ByteArrayWritableChannel byteChannel;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    /**
    +     * Returns an approximation of the amount of data transferred. See {@link #count()}.
    +     */
    +    @Override
    +    public long transfered() {
    +      return transferred;
    +    }
    +
    +    /**
    +     * Transfers data from the original message to the channel, encrypting it in the process.
    +     *
    +     * This method also breaks down the original message into smaller chunks when needed. This
    +     * is done to keep memory usage under control. This avoids having to copy the whole message
    +     * data into memory at once, and can avoid ballooning memory usage when transferring large
    +     * messages such as shuffle blocks.
    +     *
    +     * The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
    +     * until a whole chunk has been written. This is done because the code can't use the actual
    +     * number of bytes written to the channel as the transferred count (see {@link #count()}).
    +     * Instead, once an encrypted chunk is written to the output (including its header), the
    +     * size of the original block will be added to the {@link #transfered()} amount.
    +     */
    +    @Override
    +    public long transferTo(final WritableByteChannel target, final long position)
    +      throws IOException {
    +
    +      Preconditions.checkArgument(position == transfered(), "Invalid position.");
    +
    +      long written = 0;
    +      do {
    +        if (currentChunk == null) {
    +          nextChunk();
    +        }
    +
    +        if (currentHeader.readableBytes() > 0) {
    +          int bytesWritten = target.write(currentHeader.nioBuffer());
    +          currentHeader.skipBytes(bytesWritten);
    +          if (currentHeader.readableBytes() > 0) {
    +            // Break out of loop if there are still header bytes left to write.
    +            break;
    +          }
    +        }
    +
    +        target.write(currentChunk);
    +        if (!currentChunk.hasRemaining()) {
    +          // Only update the count of written bytes once a full chunk has been written.
    --- End diff --
    
    This mechanism may add even more performance burden, as I believe there is special logic which checks for zero-sized writes and backs off from trying to send until the channel appears writable again. However, I can't think of a good way to avoid this that isn't significantly hackier, so it's probably fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97516427
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93109032
  
    /cc @rxin @aarondav in case you guys missed the JIRA update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r28491206
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
    @@ -55,14 +56,14 @@
       private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
     
       private final TransportConf conf;
    -  private final RpcHandler rpcHandler;
    +  private final RpcHandler appRpcHandler;
    --- End diff --
    
    Ah -- I see, this is the guy before the bootstraps are executed, which is used by the client directly and as the original handler otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-90277066
  
      [Test build #29759 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29759/consoleFull) for   PR 5377 at commit [`cf2a605`](https://github.com/apache/spark/commit/cf2a605d02afd4f462270a0d808ff82c75ec04fd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121626
  
    --- Diff: network/common/src/test/resources/log4j.properties ---
    @@ -23,5 +23,5 @@ log4j.appender.file.file=target/unit-tests.log
     log4j.appender.file.layout=org.apache.log4j.PatternLayout
     log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
     
    -# Silence verbose logs from 3rd-party libraries.
    +# Filter debug messages from noisy 3rd-party libs.
    --- End diff --
    
    kind of a funny change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29190170
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final ByteArrayWritableChannel byteChannel;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    /**
    +     * Returns an approximation of the amount of data transferred. See {@link #count()}.
    +     */
    +    @Override
    +    public long transfered() {
    +      return transferred;
    +    }
    +
    +    /**
    +     * Transfers data from the original message to the channel, encrypting it in the process.
    +     *
    +     * This method also breaks down the original message into smaller chunks when needed. This
    +     * is done to keep memory usage under control. This avoids having to copy the whole message
    +     * data into memory at once, and can avoid ballooning memory usage when transferring large
    +     * messages such as shuffle blocks.
    +     *
    +     * The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
    +     * until a whole chunk has been written. This is done because the code can't use the actual
    +     * number of bytes written to the channel as the transferred count (see {@link #count()}).
    +     * Instead, once an encrypted chunk is written to the output (including its header), the
    +     * size of the original block will be added to the {@link #transfered()} amount.
    +     */
    +    @Override
    +    public long transferTo(final WritableByteChannel target, final long position)
    +      throws IOException {
    +
    +      Preconditions.checkArgument(position == transfered(), "Invalid position.");
    +
    +      long written = 0;
    +      do {
    +        if (currentChunk == null) {
    +          nextChunk();
    +        }
    +
    +        if (currentHeader.readableBytes() > 0) {
    +          int bytesWritten = target.write(currentHeader.nioBuffer());
    +          currentHeader.skipBytes(bytesWritten);
    +          if (currentHeader.readableBytes() > 0) {
    +            // Break out of loop if there are still header bytes left to write.
    +            break;
    +          }
    +        }
    +
    +        target.write(currentChunk);
    +        if (!currentChunk.hasRemaining()) {
    +          // Only update the count of written bytes once a full chunk has been written.
    --- End diff --
    
    I see. I could return `1` instead, and keep the counts synchronized, but some pathological case might still require returning `0`. That should be rare, though, so let me do this to avoid the performance hit in the common case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93816159
  
      [Test build #30426 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30426/consoleFull) for   PR 5377 at commit [`7fe1489`](https://github.com/apache/spark/commit/7fe148953b09e8f7c3fad66774ce6c8aa1850a45).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97165210
  
    @vanzin Give me a day or two. I'm checking with Netty guys on the overall design pattern. I'm worried about CPU heavy encryption blocking the IO thread calling transferTo. One solution might be to merge this, and follow up with a big TODO and JIRA to improve it.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121639
  
    --- Diff: network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---
    @@ -58,10 +60,15 @@
       public ExternalShuffleClient(
           TransportConf conf,
           SecretKeyHolder secretKeyHolder,
    -      boolean saslEnabled) {
    +      boolean saslEnabled,
    +      boolean saslEncryptionEnabled) {
    +    Preconditions.checkArgument(
    +        !saslEncryptionEnabled || saslEnabled,
    --- End diff --
    
    nit: I think 2 space indent is appropriate here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93847274
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30434/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29196457
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.sasl;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.WritableByteChannel;
    +import java.util.List;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelOutboundHandlerAdapter;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.FileRegion;
    +import io.netty.handler.codec.MessageToMessageDecoder;
    +import io.netty.util.AbstractReferenceCounted;
    +import io.netty.util.ReferenceCountUtil;
    +
    +import org.apache.spark.network.util.ByteArrayWritableChannel;
    +import org.apache.spark.network.util.NettyUtils;
    +
    +class SaslEncryption {
    +
    +  @VisibleForTesting
    +  static final String ENCRYPTION_HANDLER_NAME = "saslEncryption";
    +
    +  /**
    +   * Adds channel handlers that perform encryption / decryption of data using SASL.
    +   *
    +   * @param channel The channel.
    +   * @param backend The SASL backend.
    +   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control
    +   *                             memory usage.
    +   */
    +  static void addToChannel(
    +      Channel channel,
    +      SaslEncryptionBackend backend,
    +      int maxOutboundBlockSize) {
    +    channel.pipeline()
    +      .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize))
    +      .addFirst("saslDecryption", new DecryptionHandler(backend))
    +      .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder());
    +  }
    +
    +  private static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
    +
    +    private final int maxOutboundBlockSize;
    +    private final SaslEncryptionBackend backend;
    +
    +    EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) {
    +      this.backend = backend;
    +      this.maxOutboundBlockSize = maxOutboundBlockSize;
    +    }
    +
    +    /**
    +     * Wrap the incoming message in an implementation that will perform encryption lazily. This is
    +     * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in
    +     * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it
    +     * does not guarantee any ordering.
    +     */
    +    @Override
    +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    +      throws Exception {
    +
    +      ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise);
    +    }
    +
    +    @Override
    +    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    +      try {
    +        backend.dispose();
    +      } finally {
    +        super.handlerRemoved(ctx);
    +      }
    +    }
    +
    +  }
    +
    +  private static class DecryptionHandler extends MessageToMessageDecoder<ByteBuf> {
    +
    +    private final SaslEncryptionBackend backend;
    +
    +    DecryptionHandler(SaslEncryptionBackend backend) {
    +      this.backend = backend;
    +    }
    +
    +    @Override
    +    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
    +      throws Exception {
    +
    +      byte[] data;
    +      int offset;
    +      int length = msg.readableBytes();
    +      if (msg.hasArray()) {
    +        data = msg.array();
    +        offset = msg.arrayOffset();
    +      } else {
    +        data = new byte[length];
    +        msg.readBytes(data);
    +        offset = 0;
    +      }
    +
    +      out.add(Unpooled.wrappedBuffer(backend.unwrap(data, offset, length)));
    +    }
    +
    +  }
    +
    +  private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
    +
    +    private final SaslEncryptionBackend backend;
    +    private final boolean isByteBuf;
    +    private final ByteBuf buf;
    +    private final FileRegion region;
    +    private final ByteArrayWritableChannel byteChannel;
    +
    +    private ByteBuf currentHeader;
    +    private ByteBuffer currentChunk;
    +    private long currentChunkSize;
    +    private long unencryptedChunkSize;
    +    private long transferred;
    +
    +    EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) {
    +      Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
    +        "Unrecognized message type: %s", msg.getClass().getName());
    +      this.backend = backend;
    +      this.isByteBuf = msg instanceof ByteBuf;
    +      this.buf = isByteBuf ? (ByteBuf) msg : null;
    +      this.region = isByteBuf ? null : (FileRegion) msg;
    +      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
    +    }
    +
    +    /**
    +     * Returns the size of the original (unencrypted) message.
    +     *
    +     * This makes assumptions about how netty treats FileRegion instances, because there's no way
    +     * to know beforehand what will be the size of the encrypted message. Namely, it assumes
    +     * that netty will try to transfer data from this message while
    +     * <code>transfered() < count()</code>. So these two methods return, technically, wrong data,
    +     * but netty doesn't know better.
    +     */
    +    @Override
    +    public long count() {
    +      return isByteBuf ? buf.readableBytes() : region.count();
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    /**
    +     * Returns an approximation of the amount of data transferred. See {@link #count()}.
    +     */
    +    @Override
    +    public long transfered() {
    +      return transferred;
    +    }
    +
    +    /**
    +     * Transfers data from the original message to the channel, encrypting it in the process.
    +     *
    +     * This method also breaks down the original message into smaller chunks when needed. This
    +     * is done to keep memory usage under control. This avoids having to copy the whole message
    +     * data into memory at once, and can avoid ballooning memory usage when transferring large
    +     * messages such as shuffle blocks.
    +     *
    +     * The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
    +     * until a whole chunk has been written. This is done because the code can't use the actual
    +     * number of bytes written to the channel as the transferred count (see {@link #count()}).
    +     * Instead, once an encrypted chunk is written to the output (including its header), the
    +     * size of the original block will be added to the {@link #transfered()} amount.
    +     */
    +    @Override
    +    public long transferTo(final WritableByteChannel target, final long position)
    +      throws IOException {
    +
    +      Preconditions.checkArgument(position == transfered(), "Invalid position.");
    +
    +      long written = 0;
    +      do {
    +        if (currentChunk == null) {
    +          nextChunk();
    +        }
    +
    +        if (currentHeader.readableBytes() > 0) {
    +          int bytesWritten = target.write(currentHeader.nioBuffer());
    +          currentHeader.skipBytes(bytesWritten);
    +          if (currentHeader.readableBytes() > 0) {
    +            // Break out of loop if there are still header bytes left to write.
    +            break;
    +          }
    +        }
    +
    +        target.write(currentChunk);
    +        if (!currentChunk.hasRemaining()) {
    +          // Only update the count of written bytes once a full chunk has been written.
    +          // See method javadoc.
    +          written += unencryptedChunkSize;
    +          currentHeader.release();
    +          currentHeader = null;
    +          currentChunk = null;
    +          currentChunkSize = 0;
    +        }
    +      } while (currentChunk == null && transfered() + written < count());
    +
    +      transferred += written;
    +      return written;
    +    }
    +
    +    private void nextChunk() throws IOException {
    +      byteChannel.reset();
    +      if (isByteBuf) {
    +        int copied = byteChannel.write(buf.nioBuffer());
    --- End diff --
    
    I think `Channels.newChannel(outputStream)` might work.
    
    (The reason I'd especially like this solution is that we could keep ByteArrayWritableChannel in src/test if we could just resolve this use-case without it.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97237476
  
      [Test build #31165 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31165/consoleFull) for   PR 5377 at commit [`47d4aff`](https://github.com/apache/spark/commit/47d4aff51538e09252b0f6286dd0e49052b07c18).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97237486
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96856983
  
      [Test build #31084 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31084/consoleFull) for   PR 5377 at commit [`2f92237`](https://github.com/apache/spark/commit/2f9223729d0fedb9d16015bbc885867af6e742da).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5377#discussion_r29121411
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java ---
    @@ -46,19 +47,30 @@
       /** Class which provides secret keys which are shared by server and client on a per-app basis. */
       private final SecretKeyHolder secretKeyHolder;
     
    -  /** Maps each channel to its SASL authentication state. */
    -  private final ConcurrentMap<TransportClient, SparkSaslServer> channelAuthenticationMap;
    +  /** The client channel. */
    +  private final Channel channel;
     
    -  public SaslRpcHandler(RpcHandler delegate, SecretKeyHolder secretKeyHolder) {
    +  private final TransportConf conf;
    +
    +  private SparkSaslServer saslServer;
    +  private boolean isComplete;
    +
    +  SaslRpcHandler(
    +      TransportConf conf,
    +      Channel channel,
    +      RpcHandler delegate,
    +      SecretKeyHolder secretKeyHolder) {
    +    this.conf = conf;
    --- End diff --
    
    nit: reorder fields to follow same order as constructor parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-93677780
  
    This is looking very good to me. I've reviewed the core transport part and I like the API. I will have to defer reviewing the rest of the SASL Encryption side and the tests for a bit longer, though, so apologies on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97596699
  
    >  I'm worried about CPU heavy encryption blocking the IO thread calling transferTo
    
    Yeah, that's the part that sucks with the current approach. But I couldn't find an alternative.
    
    - ThreadPerChannelEventLoop can solve the problem but doesn't sound like the right approach.
    - Using a separate thread pool for the encryption / decryption handlers is possible, but we still need to guarantee ordering when pushing messages on the outbound path; I don't think you're allowed to call blocking methods (like `ChannelPromise.sync()`) in channel threads.
    
    Alternative suggestions are mostly welcome, if available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96849236
  
    > Can probably eliminate this by constructing it in nextChunk(), I believe that's the only place it's used
    
    That would generate more garbage (a new instance per call to `nextChunk`). Probably not a big deal, though...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96875580
  
      [Test build #31084 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31084/consoleFull) for   PR 5377 at commit [`2f92237`](https://github.com/apache/spark/commit/2f9223729d0fedb9d16015bbc885867af6e742da).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SaslEncryption `
      * `  static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion `
      * `class SaslRpcHandler extends RpcHandler `
      * `public class SaslServerBootstrap implements TransportServerBootstrap `
      * `public class SparkSaslClient implements SaslEncryptionBackend `
      * `public class SparkSaslServer implements SaslEncryptionBackend `
      * `public class ByteArrayWritableChannel implements WritableByteChannel `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96842148
  
    I made the changes to report non-zero written bytes (https://github.com/vanzin/spark/commit/346f8291a91b0b73742263d7a23ee4027d76adb9). But after writing and testing it, I kinda started questioning whether it will improve anything...
    
    `0` or `1` are basically returned in the same situation: the underlying channel does not have enough buffer. So returning non-zero would cause netty to "busy loop" and call `transferTo` again, which would fail to write anything and thus would now return 0.
    
    But I guess this makes the code behave more like it would with other non-hacky `FileRegion` implementations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-96514035
  
    LGTM, only minor comments. The tests look good. Apologies for taking so long to review!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5377#issuecomment-97516450
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org