You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/05 23:39:04 UTC

git commit: [SPARK-4242] [Core] Add SASL to external shuffle service

Repository: spark
Updated Branches:
  refs/heads/master 5b3b6f6f5 -> 4c42986cc


[SPARK-4242] [Core] Add SASL to external shuffle service

Does three things: (1) Adds SASL to ExternalShuffleClient, (2) puts SecurityManager in BlockManager's constructor, and (3) adds unit test.

Author: Aaron Davidson <aa...@databricks.com>

Closes #3108 from aarondav/sasl-client and squashes the following commits:

48b622d [Aaron Davidson] Screw it, let's just get LimitedInputStream
3543b70 [Aaron Davidson] Back out of pom change due to unknown test issue?
b58518a [Aaron Davidson] ByteStreams.limit() not available :(
cbe451a [Aaron Davidson] Address comments
2bf2908 [Aaron Davidson] [SPARK-4242] [Core] Add SASL to external shuffle service


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c42986c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c42986c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c42986c

Branch: refs/heads/master
Commit: 4c42986cc070d9c5c55c7bf8a2a67585967b1082
Parents: 5b3b6f6
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Nov 5 14:38:43 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Nov 5 14:38:43 2014 -0800

----------------------------------------------------------------------
 LICENSE                                         |  21 +++-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |  12 +-
 .../storage/BlockManagerReplicationSuite.scala  |   4 +-
 .../spark/storage/BlockManagerSuite.scala       |   4 +-
 network/common/pom.xml                          |   1 +
 .../buffer/FileSegmentManagedBuffer.java        |   3 +-
 .../spark/network/util/LimitedInputStream.java  |  87 ++++++++++++++
 network/shuffle/pom.xml                         |   1 +
 .../spark/network/sasl/SparkSaslClient.java     |   1 -
 .../spark/network/sasl/SparkSaslServer.java     |   9 +-
 .../network/shuffle/ExternalShuffleClient.java  |  31 ++++-
 .../ExternalShuffleIntegrationSuite.java        |   4 +-
 .../shuffle/ExternalShuffleSecuritySuite.java   | 113 +++++++++++++++++++
 .../streaming/ReceivedBlockHandlerSuite.scala   |   2 +-
 15 files changed, 272 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index f1732fb..3c667bf 100644
--- a/LICENSE
+++ b/LICENSE
@@ -754,7 +754,7 @@ SUCH DAMAGE.
 
 
 ========================================================================
-For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
+For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
 ========================================================================
 Copyright (C) 2008 The Android Open Source Project
 
@@ -772,6 +772,25 @@ limitations under the License.
 
 
 ========================================================================
+For LimitedInputStream
+  (network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
+========================================================================
+Copyright (C) 2007 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+========================================================================
 BSD-style licenses
 ========================================================================
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 45e9d7f..e7454be 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -287,7 +287,7 @@ object SparkEnv extends Logging {
 
     // NB: blockManager is not valid until initialize() is called later.
     val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
-      serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
+      serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
 
     val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 655d16c..a5fb87b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -72,7 +72,8 @@ private[spark] class BlockManager(
     val conf: SparkConf,
     mapOutputTracker: MapOutputTracker,
     shuffleManager: ShuffleManager,
-    blockTransferService: BlockTransferService)
+    blockTransferService: BlockTransferService,
+    securityManager: SecurityManager)
   extends BlockDataManager with Logging {
 
   val diskBlockManager = new DiskBlockManager(this, conf)
@@ -115,7 +116,8 @@ private[spark] class BlockManager(
   // Client to read other executors' shuffle files. This is either an external service, or just the
   // standard BlockTranserService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
-    new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
+    new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
+      securityManager.isAuthenticationEnabled())
   } else {
     blockTransferService
   }
@@ -166,9 +168,10 @@ private[spark] class BlockManager(
       conf: SparkConf,
       mapOutputTracker: MapOutputTracker,
       shuffleManager: ShuffleManager,
-      blockTransferService: BlockTransferService) = {
+      blockTransferService: BlockTransferService,
+      securityManager: SecurityManager) = {
     this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
-      conf, mapOutputTracker, shuffleManager, blockTransferService)
+      conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
   }
 
   /**
@@ -219,7 +222,6 @@ private[spark] class BlockManager(
         return
       } catch {
         case e: Exception if i < MAX_ATTEMPTS =>
-          val attemptsRemaining =
           logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
           Thread.sleep(SLEEP_TIME_SECS * 1000)

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1461fa6..f63e772 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
-      mapOutputTracker, shuffleManager, transfer)
+      mapOutputTracker, shuffleManager, transfer, securityMgr)
     store.initialize("app-id")
     allStores += store
     store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
     when(failableTransfer.hostName).thenReturn("some-hostname")
     when(failableTransfer.port).thenReturn(1000)
     val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
-      10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
+      10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
     failableStore.initialize("app-id")
     allStores += failableStore // so that this gets stopped after test
     assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0782876..9529502 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
-      mapOutputTracker, shuffleManager, transfer)
+      mapOutputTracker, shuffleManager, transfer, securityMgr)
     manager.initialize("app-id")
     manager
   }
@@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     // Use Java serializer so we can create an unserializable error.
     val transfer = new NioBlockTransferService(conf, securityMgr)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
-      new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
+      new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/common/pom.xml
----------------------------------------------------------------------
diff --git a/network/common/pom.xml b/network/common/pom.xml
index ea88714..6144548 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -50,6 +50,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
+      <version>11.0.2</version> <!-- yarn 2.4.0's version -->
       <scope>provided</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 89ed79b..5fa1527 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -30,6 +30,7 @@ import com.google.common.io.ByteStreams;
 import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.LimitedInputStream;
 
 /**
  * A {@link ManagedBuffer} backed by a segment in a file.
@@ -101,7 +102,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
     try {
       is = new FileInputStream(file);
       ByteStreams.skipFully(is, offset);
-      return ByteStreams.limit(is, length);
+      return new LimitedInputStream(is, length);
     } catch (IOException e) {
       try {
         if (is != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
new file mode 100644
index 0000000..63ca43c
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wraps a {@link InputStream}, limiting the number of bytes which can be read.
+ *
+ * This code is from Guava's 14.0 source code, because there is no compatible way to
+ * use this functionality in both a Guava 11 environment and a Guava >14 environment.
+ */
+public final class LimitedInputStream extends FilterInputStream {
+  private long left;
+  private long mark = -1;
+
+  public LimitedInputStream(InputStream in, long limit) {
+    super(in);
+    Preconditions.checkNotNull(in);
+    Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
+    left = limit;
+  }
+  @Override public int available() throws IOException {
+    return (int) Math.min(in.available(), left);
+  }
+  // it's okay to mark even if mark isn't supported, as reset won't work
+  @Override public synchronized void mark(int readLimit) {
+    in.mark(readLimit);
+    mark = left;
+  }
+  @Override public int read() throws IOException {
+    if (left == 0) {
+      return -1;
+    }
+    int result = in.read();
+    if (result != -1) {
+      --left;
+    }
+    return result;
+  }
+  @Override public int read(byte[] b, int off, int len) throws IOException {
+    if (left == 0) {
+      return -1;
+    }
+    len = (int) Math.min(len, left);
+    int result = in.read(b, off, len);
+    if (result != -1) {
+      left -= result;
+    }
+    return result;
+  }
+  @Override public synchronized void reset() throws IOException {
+    if (!in.markSupported()) {
+      throw new IOException("Mark not supported");
+    }
+    if (mark == -1) {
+      throw new IOException("Mark not set");
+    }
+    in.reset();
+    left = mark;
+  }
+  @Override public long skip(long n) throws IOException {
+    n = Math.min(n, left);
+    long skipped = in.skip(n);
+    left -= skipped;
+    return skipped;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/pom.xml
----------------------------------------------------------------------
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index d271704..fe5681d 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -51,6 +51,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
+      <version>11.0.2</version> <!-- yarn 2.4.0's version -->
       <scope>provided</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
index 72ba737..9abad1f 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
@@ -126,7 +126,6 @@ public class SparkSaslClient {
           logger.trace("SASL client callback: setting realm");
           RealmCallback rc = (RealmCallback) callback;
           rc.setText(rc.getDefaultText());
-          logger.info("Realm callback");
         } else if (callback instanceof RealmChoiceCallback) {
           // ignore (?)
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
index 2c0ce40..e87b17e 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
@@ -34,7 +34,8 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.BaseEncoding;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.base64.Base64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,12 +160,14 @@ public class SparkSaslServer {
   /* Encode a byte[] identifier as a Base64-encoded string. */
   public static String encodeIdentifier(String identifier) {
     Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
-    return BaseEncoding.base64().encode(identifier.getBytes(Charsets.UTF_8));
+    return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
+      .toString(Charsets.UTF_8);
   }
 
   /** Encode a password as a base64-encoded char[] array. */
   public static char[] encodePassword(String password) {
     Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
-    return BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)).toCharArray();
+    return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
+      .toString(Charsets.UTF_8).toCharArray();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index b0b19ba..3aa95d0 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,12 +17,18 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.util.List;
+
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
 import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.sasl.SaslClientBootstrap;
+import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.server.NoOpRpcHandler;
 import org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor;
 import org.apache.spark.network.util.JavaUtils;
@@ -37,18 +43,35 @@ import org.apache.spark.network.util.TransportConf;
 public class ExternalShuffleClient extends ShuffleClient {
   private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
 
-  private final TransportClientFactory clientFactory;
+  private final TransportConf conf;
+  private final boolean saslEnabled;
+  private final SecretKeyHolder secretKeyHolder;
 
+  private TransportClientFactory clientFactory;
   private String appId;
 
-  public ExternalShuffleClient(TransportConf conf) {
-    TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
-    this.clientFactory = context.createClientFactory();
+  /**
+   * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
+   * then secretKeyHolder may be null.
+   */
+  public ExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled) {
+    this.conf = conf;
+    this.secretKeyHolder = secretKeyHolder;
+    this.saslEnabled = saslEnabled;
   }
 
   @Override
   public void init(String appId) {
     this.appId = appId;
+    TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+    List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
+    if (saslEnabled) {
+      bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder));
+    }
+    clientFactory = context.createClientFactory(bootstraps);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index bc101f5..71e017b 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -135,7 +135,7 @@ public class ExternalShuffleIntegrationSuite {
 
     final Semaphore requestsRemaining = new Semaphore(0);
 
-    ExternalShuffleClient client = new ExternalShuffleClient(conf);
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
     client.init(APP_ID);
     client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
       new BlockFetchingListener() {
@@ -267,7 +267,7 @@ public class ExternalShuffleIntegrationSuite {
   }
 
   private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) {
-    ExternalShuffleClient client = new ExternalShuffleClient(conf);
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
     client.init(APP_ID);
     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
       executorId, executorInfo);

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
new file mode 100644
index 0000000..4c18fcd
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class ExternalShuffleSecuritySuite {
+
+  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  TransportServer server;
+
+  @Before
+  public void beforeEach() {
+    RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+      new TestSecretKeyHolder("my-app-id", "secret"));
+    TransportContext context = new TransportContext(conf, handler);
+    this.server = context.createServer();
+  }
+
+  @After
+  public void afterEach() {
+    if (server != null) {
+      server.close();
+      server = null;
+    }
+  }
+
+  @Test
+  public void testValid() {
+    validate("my-app-id", "secret");
+  }
+
+  @Test
+  public void testBadAppId() {
+    try {
+      validate("wrong-app-id", "secret");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
+    }
+  }
+
+  @Test
+  public void testBadSecret() {
+    try {
+      validate("my-app-id", "bad-secret");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
+    }
+  }
+
+  /** Creates an ExternalShuffleClient and attempts to register with the server. */
+  private void validate(String appId, String secretKey) {
+    ExternalShuffleClient client =
+      new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true);
+    client.init(appId);
+    // Registration either succeeds or throws an exception.
+    client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
+      new ExecutorShuffleInfo(new String[0], 0, ""));
+    client.close();
+  }
+
+  /** Provides a secret key holder which always returns the given secret key, for a single appId. */
+  static class TestSecretKeyHolder implements SecretKeyHolder {
+    private final String appId;
+    private final String secretKey;
+
+    TestSecretKeyHolder(String appId, String secretKey) {
+      this.appId = appId;
+      this.secretKey = secretKey;
+    }
+
+    @Override
+    public String getSaslUser(String appId) {
+      return "user";
+    }
+
+    @Override
+    public String getSecretKey(String appId) {
+      if (!appId.equals(this.appId)) {
+        throw new IllegalArgumentException("Wrong appId!");
+      }
+      return secretKey;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4c42986c/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 0f27f55..9efe15d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
 
     blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
       blockManagerSize, conf, mapOutputTracker, shuffleManager,
-      new NioBlockTransferService(conf, securityMgr))
+      new NioBlockTransferService(conf, securityMgr), securityMgr)
     blockManager.initialize("app-id")
 
     tempDirectory = Files.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org