You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:18 UTC

[02/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
deleted file mode 100644
index 6758203..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.mesos;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.shuffle.ExternalShuffleClient;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * A client for talking to the external shuffle service in Mesos coarse-grained mode.
- *
- * This is used by the Spark driver to register with each external shuffle service on the cluster.
- * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
- * after the application exits. Mesos does not provide a great alternative to do this, so Spark
- * has to detect this itself.
- */
-public class MesosExternalShuffleClient extends ExternalShuffleClient {
-  private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
-
-  /**
-   * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
-   * Please refer to docs on {@link ExternalShuffleClient} for more information.
-   */
-  public MesosExternalShuffleClient(
-      TransportConf conf,
-      SecretKeyHolder secretKeyHolder,
-      boolean saslEnabled,
-      boolean saslEncryptionEnabled) {
-    super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
-  }
-
-  public void registerDriverWithShuffleService(String host, int port) throws IOException {
-    checkInit();
-    ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
-    TransportClient client = clientFactory.createClient(host, port);
-    client.sendRpc(registerDriver, new RpcResponseCallback() {
-      @Override
-      public void onSuccess(ByteBuffer response) {
-        logger.info("Successfully registered app " + appId + " with external shuffle service.");
-      }
-
-      @Override
-      public void onFailure(Throwable e) {
-        logger.warn("Unable to register app " + appId + " with external shuffle service. " +
-          "Please manually remove shuffle data after driver exit. Error: " + e);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
deleted file mode 100644
index 7fbe338..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.nio.ByteBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-
-/**
- * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
- * by Spark's NettyBlockTransferService.
- *
- * At a high level:
- *   - OpenBlock is handled by both services, but only services shuffle files for the external
- *     shuffle service. It returns a StreamHandle.
- *   - UploadBlock is only handled by the NettyBlockTransferService.
- *   - RegisterExecutor is only handled by the external shuffle service.
- */
-public abstract class BlockTransferMessage implements Encodable {
-  protected abstract Type type();
-
-  /** Preceding every serialized message is its type, which allows us to deserialize it. */
-  public static enum Type {
-    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
-
-    private final byte id;
-
-    private Type(int id) {
-      assert id < 128 : "Cannot have more than 128 message types";
-      this.id = (byte) id;
-    }
-
-    public byte id() { return id; }
-  }
-
-  // NB: Java does not support static methods in interfaces, so we must put this in a static class.
-  public static class Decoder {
-    /** Deserializes the 'type' byte followed by the message itself. */
-    public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
-      ByteBuf buf = Unpooled.wrappedBuffer(msg);
-      byte type = buf.readByte();
-      switch (type) {
-        case 0: return OpenBlocks.decode(buf);
-        case 1: return UploadBlock.decode(buf);
-        case 2: return RegisterExecutor.decode(buf);
-        case 3: return StreamHandle.decode(buf);
-        case 4: return RegisterDriver.decode(buf);
-        default: throw new IllegalArgumentException("Unknown message type: " + type);
-      }
-    }
-  }
-
-  /** Serializes the 'type' byte followed by the message itself. */
-  public ByteBuffer toByteBuffer() {
-    // Allow room for encoded message, plus the type byte
-    ByteBuf buf = Unpooled.buffer(encodedLength() + 1);
-    buf.writeByte(type().id);
-    encode(buf);
-    assert buf.writableBytes() == 0 : "Writable bytes remain: " + buf.writableBytes();
-    return buf.nioBuffer();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
deleted file mode 100644
index 102d4ef..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.protocol.Encoders;
-
-/** Contains all configuration necessary for locating the shuffle files of an executor. */
-public class ExecutorShuffleInfo implements Encodable {
-  /** The base set of local directories that the executor stores its shuffle files in. */
-  public final String[] localDirs;
-  /** Number of subdirectories created within each localDir. */
-  public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
-  public final String shuffleManager;
-
-  @JsonCreator
-  public ExecutorShuffleInfo(
-      @JsonProperty("localDirs") String[] localDirs,
-      @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
-      @JsonProperty("shuffleManager") String shuffleManager) {
-    this.localDirs = localDirs;
-    this.subDirsPerLocalDir = subDirsPerLocalDir;
-    this.shuffleManager = shuffleManager;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("localDirs", Arrays.toString(localDirs))
-      .add("subDirsPerLocalDir", subDirsPerLocalDir)
-      .add("shuffleManager", shuffleManager)
-      .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other != null && other instanceof ExecutorShuffleInfo) {
-      ExecutorShuffleInfo o = (ExecutorShuffleInfo) other;
-      return Arrays.equals(localDirs, o.localDirs)
-        && Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir)
-        && Objects.equal(shuffleManager, o.shuffleManager);
-    }
-    return false;
-  }
-
-  @Override
-  public int encodedLength() {
-    return Encoders.StringArrays.encodedLength(localDirs)
-        + 4 // int
-        + Encoders.Strings.encodedLength(shuffleManager);
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    Encoders.StringArrays.encode(buf, localDirs);
-    buf.writeInt(subDirsPerLocalDir);
-    Encoders.Strings.encode(buf, shuffleManager);
-  }
-
-  public static ExecutorShuffleInfo decode(ByteBuf buf) {
-    String[] localDirs = Encoders.StringArrays.decode(buf);
-    int subDirsPerLocalDir = buf.readInt();
-    String shuffleManager = Encoders.Strings.decode(buf);
-    return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
deleted file mode 100644
index ce954b8..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/** Request to read a set of blocks. Returns {@link StreamHandle}. */
-public class OpenBlocks extends BlockTransferMessage {
-  public final String appId;
-  public final String execId;
-  public final String[] blockIds;
-
-  public OpenBlocks(String appId, String execId, String[] blockIds) {
-    this.appId = appId;
-    this.execId = execId;
-    this.blockIds = blockIds;
-  }
-
-  @Override
-  protected Type type() { return Type.OPEN_BLOCKS; }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("appId", appId)
-      .add("execId", execId)
-      .add("blockIds", Arrays.toString(blockIds))
-      .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other != null && other instanceof OpenBlocks) {
-      OpenBlocks o = (OpenBlocks) other;
-      return Objects.equal(appId, o.appId)
-        && Objects.equal(execId, o.execId)
-        && Arrays.equals(blockIds, o.blockIds);
-    }
-    return false;
-  }
-
-  @Override
-  public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId)
-      + Encoders.Strings.encodedLength(execId)
-      + Encoders.StringArrays.encodedLength(blockIds);
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    Encoders.Strings.encode(buf, appId);
-    Encoders.Strings.encode(buf, execId);
-    Encoders.StringArrays.encode(buf, blockIds);
-  }
-
-  public static OpenBlocks decode(ByteBuf buf) {
-    String appId = Encoders.Strings.decode(buf);
-    String execId = Encoders.Strings.decode(buf);
-    String[] blockIds = Encoders.StringArrays.decode(buf);
-    return new OpenBlocks(appId, execId, blockIds);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
deleted file mode 100644
index 167ef33..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * Initial registration message between an executor and its local shuffle server.
- * Returns nothing (empty byte array).
- */
-public class RegisterExecutor extends BlockTransferMessage {
-  public final String appId;
-  public final String execId;
-  public final ExecutorShuffleInfo executorInfo;
-
-  public RegisterExecutor(
-      String appId,
-      String execId,
-      ExecutorShuffleInfo executorInfo) {
-    this.appId = appId;
-    this.execId = execId;
-    this.executorInfo = executorInfo;
-  }
-
-  @Override
-  protected Type type() { return Type.REGISTER_EXECUTOR; }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(appId, execId, executorInfo);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("appId", appId)
-      .add("execId", execId)
-      .add("executorInfo", executorInfo)
-      .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other != null && other instanceof RegisterExecutor) {
-      RegisterExecutor o = (RegisterExecutor) other;
-      return Objects.equal(appId, o.appId)
-        && Objects.equal(execId, o.execId)
-        && Objects.equal(executorInfo, o.executorInfo);
-    }
-    return false;
-  }
-
-  @Override
-  public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId)
-      + Encoders.Strings.encodedLength(execId)
-      + executorInfo.encodedLength();
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    Encoders.Strings.encode(buf, appId);
-    Encoders.Strings.encode(buf, execId);
-    executorInfo.encode(buf);
-  }
-
-  public static RegisterExecutor decode(ByteBuf buf) {
-    String appId = Encoders.Strings.decode(buf);
-    String execId = Encoders.Strings.decode(buf);
-    ExecutorShuffleInfo executorShuffleInfo = ExecutorShuffleInfo.decode(buf);
-    return new RegisterExecutor(appId, execId, executorShuffleInfo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
deleted file mode 100644
index 1915295..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * Identifier for a fixed number of chunks to read from a stream created by an "open blocks"
- * message. This is used by {@link org.apache.spark.network.shuffle.OneForOneBlockFetcher}.
- */
-public class StreamHandle extends BlockTransferMessage {
-  public final long streamId;
-  public final int numChunks;
-
-  public StreamHandle(long streamId, int numChunks) {
-    this.streamId = streamId;
-    this.numChunks = numChunks;
-  }
-
-  @Override
-  protected Type type() { return Type.STREAM_HANDLE; }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(streamId, numChunks);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("streamId", streamId)
-      .add("numChunks", numChunks)
-      .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other != null && other instanceof StreamHandle) {
-      StreamHandle o = (StreamHandle) other;
-      return Objects.equal(streamId, o.streamId)
-        && Objects.equal(numChunks, o.numChunks);
-    }
-    return false;
-  }
-
-  @Override
-  public int encodedLength() {
-    return 8 + 4;
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    buf.writeLong(streamId);
-    buf.writeInt(numChunks);
-  }
-
-  public static StreamHandle decode(ByteBuf buf) {
-    long streamId = buf.readLong();
-    int numChunks = buf.readInt();
-    return new StreamHandle(streamId, numChunks);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
deleted file mode 100644
index 3caed59..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-
-/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
-public class UploadBlock extends BlockTransferMessage {
-  public final String appId;
-  public final String execId;
-  public final String blockId;
-  // TODO: StorageLevel is serialized separately in here because StorageLevel is not available in
-  // this package. We should avoid this hack.
-  public final byte[] metadata;
-  public final byte[] blockData;
-
-  /**
-   * @param metadata Meta-information about block, typically StorageLevel.
-   * @param blockData The actual block's bytes.
-   */
-  public UploadBlock(
-      String appId,
-      String execId,
-      String blockId,
-      byte[] metadata,
-      byte[] blockData) {
-    this.appId = appId;
-    this.execId = execId;
-    this.blockId = blockId;
-    this.metadata = metadata;
-    this.blockData = blockData;
-  }
-
-  @Override
-  protected Type type() { return Type.UPLOAD_BLOCK; }
-
-  @Override
-  public int hashCode() {
-    int objectsHashCode = Objects.hashCode(appId, execId, blockId);
-    return (objectsHashCode * 41 + Arrays.hashCode(metadata)) * 41 + Arrays.hashCode(blockData);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("appId", appId)
-      .add("execId", execId)
-      .add("blockId", blockId)
-      .add("metadata size", metadata.length)
-      .add("block size", blockData.length)
-      .toString();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other != null && other instanceof UploadBlock) {
-      UploadBlock o = (UploadBlock) other;
-      return Objects.equal(appId, o.appId)
-        && Objects.equal(execId, o.execId)
-        && Objects.equal(blockId, o.blockId)
-        && Arrays.equals(metadata, o.metadata)
-        && Arrays.equals(blockData, o.blockData);
-    }
-    return false;
-  }
-
-  @Override
-  public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId)
-      + Encoders.Strings.encodedLength(execId)
-      + Encoders.Strings.encodedLength(blockId)
-      + Encoders.ByteArrays.encodedLength(metadata)
-      + Encoders.ByteArrays.encodedLength(blockData);
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    Encoders.Strings.encode(buf, appId);
-    Encoders.Strings.encode(buf, execId);
-    Encoders.Strings.encode(buf, blockId);
-    Encoders.ByteArrays.encode(buf, metadata);
-    Encoders.ByteArrays.encode(buf, blockData);
-  }
-
-  public static UploadBlock decode(ByteBuf buf) {
-    String appId = Encoders.Strings.decode(buf);
-    String execId = Encoders.Strings.decode(buf);
-    String blockId = Encoders.Strings.decode(buf);
-    byte[] metadata = Encoders.ByteArrays.decode(buf);
-    byte[] blockData = Encoders.ByteArrays.decode(buf);
-    return new UploadBlock(appId, execId, blockId, metadata, blockData);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
deleted file mode 100644
index 94a61d6..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol.mesos;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * A message sent from the driver to register with the MesosExternalShuffleService.
- */
-public class RegisterDriver extends BlockTransferMessage {
-  private final String appId;
-
-  public RegisterDriver(String appId) {
-    this.appId = appId;
-  }
-
-  public String getAppId() { return appId; }
-
-  @Override
-  protected Type type() { return Type.REGISTER_DRIVER; }
-
-  @Override
-  public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId);
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    Encoders.Strings.encode(buf, appId);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(appId);
-  }
-
-  public static RegisterDriver decode(ByteBuf buf) {
-    String appId = Encoders.Strings.decode(buf);
-    return new RegisterDriver(appId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
deleted file mode 100644
index 0ea631e..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.sasl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.collect.Lists;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.ChunkReceivedCallback;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientBootstrap;
-import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.server.OneForOneStreamManager;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.shuffle.BlockFetchingListener;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
-import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class SaslIntegrationSuite {
-
-  // Use a long timeout to account for slow / overloaded build machines. In the normal case,
-  // tests should finish way before the timeout expires.
-  private static final long TIMEOUT_MS = 10_000;
-
-  static TransportServer server;
-  static TransportConf conf;
-  static TransportContext context;
-  static SecretKeyHolder secretKeyHolder;
-
-  TransportClientFactory clientFactory;
-
-  @BeforeClass
-  public static void beforeAll() throws IOException {
-    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-    context = new TransportContext(conf, new TestRpcHandler());
-
-    secretKeyHolder = mock(SecretKeyHolder.class);
-    when(secretKeyHolder.getSaslUser(eq("app-1"))).thenReturn("app-1");
-    when(secretKeyHolder.getSecretKey(eq("app-1"))).thenReturn("app-1");
-    when(secretKeyHolder.getSaslUser(eq("app-2"))).thenReturn("app-2");
-    when(secretKeyHolder.getSecretKey(eq("app-2"))).thenReturn("app-2");
-    when(secretKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
-    when(secretKeyHolder.getSecretKey(anyString())).thenReturn("correct-password");
-
-    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
-    server = context.createServer(Arrays.asList(bootstrap));
-  }
-
-
-  @AfterClass
-  public static void afterAll() {
-    server.close();
-  }
-
-  @After
-  public void afterEach() {
-    if (clientFactory != null) {
-      clientFactory.close();
-      clientFactory = null;
-    }
-  }
-
-  @Test
-  public void testGoodClient() throws IOException {
-    clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
-
-    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
-    String msg = "Hello, World!";
-    ByteBuffer resp = client.sendRpcSync(JavaUtils.stringToBytes(msg), TIMEOUT_MS);
-    assertEquals(msg, JavaUtils.bytesToString(resp));
-  }
-
-  @Test
-  public void testBadClient() {
-    SecretKeyHolder badKeyHolder = mock(SecretKeyHolder.class);
-    when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
-    when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password");
-    clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
-
-    try {
-      // Bootstrap should fail on startup.
-      clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
-      fail("Connection should have failed.");
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
-    }
-  }
-
-  @Test
-  public void testNoSaslClient() throws IOException {
-    clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList());
-
-    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
-    try {
-      client.sendRpcSync(ByteBuffer.allocate(13), TIMEOUT_MS);
-      fail("Should have failed");
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("Expected SaslMessage"));
-    }
-
-    try {
-      // Guessing the right tag byte doesn't magically get you in...
-      client.sendRpcSync(ByteBuffer.wrap(new byte[] { (byte) 0xEA }), TIMEOUT_MS);
-      fail("Should have failed");
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("java.lang.IndexOutOfBoundsException"));
-    }
-  }
-
-  @Test
-  public void testNoSaslServer() {
-    RpcHandler handler = new TestRpcHandler();
-    TransportContext context = new TransportContext(conf, handler);
-    clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
-    TransportServer server = context.createServer();
-    try {
-      clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation"));
-    } finally {
-      server.close();
-    }
-  }
-
-  /**
-   * This test is not actually testing SASL behavior, but testing that the shuffle service
-   * performs correct authorization checks based on the SASL authentication data.
-   */
-  @Test
-  public void testAppIsolation() throws Exception {
-    // Start a new server with the correct RPC handler to serve block data.
-    ExternalShuffleBlockResolver blockResolver = mock(ExternalShuffleBlockResolver.class);
-    ExternalShuffleBlockHandler blockHandler = new ExternalShuffleBlockHandler(
-      new OneForOneStreamManager(), blockResolver);
-    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
-    TransportContext blockServerContext = new TransportContext(conf, blockHandler);
-    TransportServer blockServer = blockServerContext.createServer(Arrays.asList(bootstrap));
-
-    TransportClient client1 = null;
-    TransportClient client2 = null;
-    TransportClientFactory clientFactory2 = null;
-    try {
-      // Create a client, and make a request to fetch blocks from a different app.
-      clientFactory = blockServerContext.createClientFactory(
-        Lists.<TransportClientBootstrap>newArrayList(
-          new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
-      client1 = clientFactory.createClient(TestUtils.getLocalHost(),
-        blockServer.getPort());
-
-      final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-      BlockFetchingListener listener = new BlockFetchingListener() {
-        @Override
-        public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
-          notifyAll();
-        }
-
-        @Override
-        public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
-          exception.set(t);
-          notifyAll();
-        }
-      };
-
-      String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
-      OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
-        blockIds, listener);
-      synchronized (listener) {
-        fetcher.start();
-        listener.wait();
-      }
-      checkSecurityException(exception.get());
-
-      // Register an executor so that the next steps work.
-      ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
-        new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
-      RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
-      client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
-
-      // Make a successful request to fetch blocks, which creates a new stream. But do not actually
-      // fetch any blocks, to keep the stream open.
-      OpenBlocks openMessage = new OpenBlocks("app-1", "0", blockIds);
-      ByteBuffer response = client1.sendRpcSync(openMessage.toByteBuffer(), TIMEOUT_MS);
-      StreamHandle stream = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
-      long streamId = stream.streamId;
-
-      // Create a second client, authenticated with a different app ID, and try to read from
-      // the stream created for the previous app.
-      clientFactory2 = blockServerContext.createClientFactory(
-        Lists.<TransportClientBootstrap>newArrayList(
-          new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
-      client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
-        blockServer.getPort());
-
-      ChunkReceivedCallback callback = new ChunkReceivedCallback() {
-        @Override
-        public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
-          notifyAll();
-        }
-
-        @Override
-        public synchronized void onFailure(int chunkIndex, Throwable t) {
-          exception.set(t);
-          notifyAll();
-        }
-      };
-
-      exception.set(null);
-      synchronized (callback) {
-        client2.fetchChunk(streamId, 0, callback);
-        callback.wait();
-      }
-      checkSecurityException(exception.get());
-    } finally {
-      if (client1 != null) {
-        client1.close();
-      }
-      if (client2 != null) {
-        client2.close();
-      }
-      if (clientFactory2 != null) {
-        clientFactory2.close();
-      }
-      blockServer.close();
-    }
-  }
-
-  /** RPC handler which simply responds with the message it received. */
-  public static class TestRpcHandler extends RpcHandler {
-    @Override
-    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
-      callback.onSuccess(message);
-    }
-
-    @Override
-    public StreamManager getStreamManager() {
-      return new OneForOneStreamManager();
-    }
-  }
-
-  private void checkSecurityException(Throwable t) {
-    assertNotNull("No exception was caught.", t);
-    assertTrue("Expected SecurityException.",
-      t.getMessage().contains(SecurityException.class.getName()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
deleted file mode 100644
index 86c8609..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.shuffle.protocol.*;
-
-/** Verifies that all BlockTransferMessages can be serialized correctly. */
-public class BlockTransferMessagesSuite {
-  @Test
-  public void serializeOpenShuffleBlocks() {
-    checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
-    checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
-      new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
-    checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },
-      new byte[] { 4, 5, 6, 7} ));
-    checkSerializeDeserialize(new StreamHandle(12345, 16));
-  }
-
-  private void checkSerializeDeserialize(BlockTransferMessage msg) {
-    BlockTransferMessage msg2 = BlockTransferMessage.Decoder.fromByteBuffer(msg.toByteBuffer());
-    assertEquals(msg, msg2);
-    assertEquals(msg.hashCode(), msg2.hashCode());
-    assertEquals(msg.toString(), msg2.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
deleted file mode 100644
index 9379412..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.server.OneForOneStreamManager;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-import org.apache.spark.network.shuffle.protocol.UploadBlock;
-
-public class ExternalShuffleBlockHandlerSuite {
-  TransportClient client = mock(TransportClient.class);
-
-  OneForOneStreamManager streamManager;
-  ExternalShuffleBlockResolver blockResolver;
-  RpcHandler handler;
-
-  @Before
-  public void beforeEach() {
-    streamManager = mock(OneForOneStreamManager.class);
-    blockResolver = mock(ExternalShuffleBlockResolver.class);
-    handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
-  }
-
-  @Test
-  public void testRegisterExecutor() {
-    RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
-    ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
-    ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", config).toByteBuffer();
-    handler.receive(client, registerMessage, callback);
-    verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
-
-    verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
-    verify(callback, never()).onFailure(any(Throwable.class));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testOpenShuffleBlocks() {
-    RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
-    ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
-    ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
-    when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
-    when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
-    ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
-      .toByteBuffer();
-    handler.receive(client, openBlocks, callback);
-    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
-    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
-
-    ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
-    verify(callback, times(1)).onSuccess(response.capture());
-    verify(callback, never()).onFailure((Throwable) any());
-
-    StreamHandle handle =
-      (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
-    assertEquals(2, handle.numChunks);
-
-    @SuppressWarnings("unchecked")
-    ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
-        (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
-    verify(streamManager, times(1)).registerStream(anyString(), stream.capture());
-    Iterator<ManagedBuffer> buffers = stream.getValue();
-    assertEquals(block0Marker, buffers.next());
-    assertEquals(block1Marker, buffers.next());
-    assertFalse(buffers.hasNext());
-  }
-
-  @Test
-  public void testBadMessages() {
-    RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
-    ByteBuffer unserializableMsg = ByteBuffer.wrap(new byte[] { 0x12, 0x34, 0x56 });
-    try {
-      handler.receive(client, unserializableMsg, callback);
-      fail("Should have thrown");
-    } catch (Exception e) {
-      // pass
-    }
-
-    ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer();
-    try {
-      handler.receive(client, unexpectedMsg, callback);
-      fail("Should have thrown");
-    } catch (UnsupportedOperationException e) {
-      // pass
-    }
-
-    verify(callback, never()).onSuccess(any(ByteBuffer.class));
-    verify(callback, never()).onFailure(any(Throwable.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
deleted file mode 100644
index 60a1b8b..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.io.CharStreams;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ExternalShuffleBlockResolverSuite {
-  static String sortBlock0 = "Hello!";
-  static String sortBlock1 = "World!";
-
-  static String hashBlock0 = "Elementary";
-  static String hashBlock1 = "Tabular";
-
-  static TestShuffleDataContext dataContext;
-
-  static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-
-  @BeforeClass
-  public static void beforeAll() throws IOException {
-    dataContext = new TestShuffleDataContext(2, 5);
-
-    dataContext.create();
-    // Write some sort and hash data.
-    dataContext.insertSortShuffleData(0, 0,
-      new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
-    dataContext.insertHashShuffleData(1, 0,
-      new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
-  }
-
-  @AfterClass
-  public static void afterAll() {
-    dataContext.cleanup();
-  }
-
-  @Test
-  public void testBadRequests() throws IOException {
-    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
-    // Unregistered executor
-    try {
-      resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (RuntimeException e) {
-      assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
-    }
-
-    // Invalid shuffle manager
-    resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
-    try {
-      resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (UnsupportedOperationException e) {
-      // pass
-    }
-
-    // Nonexistent shuffle block
-    resolver.registerExecutor("app0", "exec3",
-      dataContext.createExecutorInfo("sort"));
-    try {
-      resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (Exception e) {
-      // pass
-    }
-  }
-
-  @Test
-  public void testSortShuffleBlocks() throws IOException {
-    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
-    resolver.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("sort"));
-
-    InputStream block0Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
-    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
-    block0Stream.close();
-    assertEquals(sortBlock0, block0);
-
-    InputStream block1Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
-    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
-    block1Stream.close();
-    assertEquals(sortBlock1, block1);
-  }
-
-  @Test
-  public void testHashShuffleBlocks() throws IOException {
-    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
-    resolver.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("hash"));
-
-    InputStream block0Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
-    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
-    block0Stream.close();
-    assertEquals(hashBlock0, block0);
-
-    InputStream block1Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
-    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
-    block1Stream.close();
-    assertEquals(hashBlock1, block1);
-  }
-
-  @Test
-  public void jsonSerializationOfExecutorRegistration() throws IOException {
-    ObjectMapper mapper = new ObjectMapper();
-    AppExecId appId = new AppExecId("foo", "bar");
-    String appIdJson = mapper.writeValueAsString(appId);
-    AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
-    assertEquals(parsedAppId, appId);
-
-    ExecutorShuffleInfo shuffleInfo =
-      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
-    String shuffleJson = mapper.writeValueAsString(shuffleInfo);
-    ExecutorShuffleInfo parsedShuffleInfo =
-      mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
-    assertEquals(parsedShuffleInfo, shuffleInfo);
-
-    // Intentionally keep these hard-coded strings in here, to check backwards-compatability.
-    // its not legacy yet, but keeping this here in case anybody changes it
-    String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
-    assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
-    String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
-      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
-    assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
deleted file mode 100644
index 532d7ab..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleCleanupSuite {
-
-  // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
-  Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
-  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-
-  @Test
-  public void noCleanupAndCleanup() throws IOException {
-    TestShuffleDataContext dataContext = createSomeData();
-
-    ExternalShuffleBlockResolver resolver =
-      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-    resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
-    resolver.applicationRemoved("app", false /* cleanup */);
-
-    assertStillThere(dataContext);
-
-    resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
-    resolver.applicationRemoved("app", true /* cleanup */);
-
-    assertCleanedUp(dataContext);
-  }
-
-  @Test
-  public void cleanupUsesExecutor() throws IOException {
-    TestShuffleDataContext dataContext = createSomeData();
-
-    final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
-
-    // Executor which does nothing to ensure we're actually using it.
-    Executor noThreadExecutor = new Executor() {
-      @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
-    };
-
-    ExternalShuffleBlockResolver manager =
-      new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
-
-    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
-    manager.applicationRemoved("app", true);
-
-    assertTrue(cleanupCalled.get());
-    assertStillThere(dataContext);
-
-    dataContext.cleanup();
-    assertCleanedUp(dataContext);
-  }
-
-  @Test
-  public void cleanupMultipleExecutors() throws IOException {
-    TestShuffleDataContext dataContext0 = createSomeData();
-    TestShuffleDataContext dataContext1 = createSomeData();
-
-    ExternalShuffleBlockResolver resolver =
-      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-
-    resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
-    resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
-    resolver.applicationRemoved("app", true);
-
-    assertCleanedUp(dataContext0);
-    assertCleanedUp(dataContext1);
-  }
-
-  @Test
-  public void cleanupOnlyRemovedApp() throws IOException {
-    TestShuffleDataContext dataContext0 = createSomeData();
-    TestShuffleDataContext dataContext1 = createSomeData();
-
-    ExternalShuffleBlockResolver resolver =
-      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-
-    resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
-    resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
-
-    resolver.applicationRemoved("app-nonexistent", true);
-    assertStillThere(dataContext0);
-    assertStillThere(dataContext1);
-
-    resolver.applicationRemoved("app-0", true);
-    assertCleanedUp(dataContext0);
-    assertStillThere(dataContext1);
-
-    resolver.applicationRemoved("app-1", true);
-    assertCleanedUp(dataContext0);
-    assertCleanedUp(dataContext1);
-
-    // Make sure it's not an error to cleanup multiple times
-    resolver.applicationRemoved("app-1", true);
-    assertCleanedUp(dataContext0);
-    assertCleanedUp(dataContext1);
-  }
-
-  private void assertStillThere(TestShuffleDataContext dataContext) {
-    for (String localDir : dataContext.localDirs) {
-      assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
-    }
-  }
-
-  private void assertCleanedUp(TestShuffleDataContext dataContext) {
-    for (String localDir : dataContext.localDirs) {
-      assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
-    }
-  }
-
-  private TestShuffleDataContext createSomeData() throws IOException {
-    Random rand = new Random(123);
-    TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
-
-    dataContext.create();
-    dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
-      new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
-    dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
-      new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
-    return dataContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
deleted file mode 100644
index 5e706bf..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleIntegrationSuite {
-
-  static String APP_ID = "app-id";
-  static String SORT_MANAGER = "sort";
-  static String HASH_MANAGER = "hash";
-
-  // Executor 0 is sort-based
-  static TestShuffleDataContext dataContext0;
-  // Executor 1 is hash-based
-  static TestShuffleDataContext dataContext1;
-
-  static ExternalShuffleBlockHandler handler;
-  static TransportServer server;
-  static TransportConf conf;
-
-  static byte[][] exec0Blocks = new byte[][] {
-    new byte[123],
-    new byte[12345],
-    new byte[1234567],
-  };
-
-  static byte[][] exec1Blocks = new byte[][] {
-    new byte[321],
-    new byte[54321],
-  };
-
-  @BeforeClass
-  public static void beforeAll() throws IOException {
-    Random rand = new Random();
-
-    for (byte[] block : exec0Blocks) {
-      rand.nextBytes(block);
-    }
-    for (byte[] block: exec1Blocks) {
-      rand.nextBytes(block);
-    }
-
-    dataContext0 = new TestShuffleDataContext(2, 5);
-    dataContext0.create();
-    dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
-
-    dataContext1 = new TestShuffleDataContext(6, 2);
-    dataContext1.create();
-    dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
-
-    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-    handler = new ExternalShuffleBlockHandler(conf, null);
-    TransportContext transportContext = new TransportContext(conf, handler);
-    server = transportContext.createServer();
-  }
-
-  @AfterClass
-  public static void afterAll() {
-    dataContext0.cleanup();
-    dataContext1.cleanup();
-    server.close();
-  }
-
-  @After
-  public void afterEach() {
-    handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
-  }
-
-  class FetchResult {
-    public Set<String> successBlocks;
-    public Set<String> failedBlocks;
-    public List<ManagedBuffer> buffers;
-
-    public void releaseBuffers() {
-      for (ManagedBuffer buffer : buffers) {
-        buffer.release();
-      }
-    }
-  }
-
-  // Fetch a set of blocks from a pre-registered executor.
-  private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
-    return fetchBlocks(execId, blockIds, server.getPort());
-  }
-
-  // Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port,
-  // to allow connecting to invalid servers.
-  private FetchResult fetchBlocks(String execId, String[] blockIds, int port) throws Exception {
-    final FetchResult res = new FetchResult();
-    res.successBlocks = Collections.synchronizedSet(new HashSet<String>());
-    res.failedBlocks = Collections.synchronizedSet(new HashSet<String>());
-    res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
-
-    final Semaphore requestsRemaining = new Semaphore(0);
-
-    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
-    client.init(APP_ID);
-    client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
-      new BlockFetchingListener() {
-        @Override
-        public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
-          synchronized (this) {
-            if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
-              data.retain();
-              res.successBlocks.add(blockId);
-              res.buffers.add(data);
-              requestsRemaining.release();
-            }
-          }
-        }
-
-        @Override
-        public void onBlockFetchFailure(String blockId, Throwable exception) {
-          synchronized (this) {
-            if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
-              res.failedBlocks.add(blockId);
-              requestsRemaining.release();
-            }
-          }
-        }
-      });
-
-    if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
-      fail("Timeout getting response from the server");
-    }
-    client.close();
-    return res;
-  }
-
-  @Test
-  public void testFetchOneSort() throws Exception {
-    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-    FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
-    assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
-    assertTrue(exec0Fetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0]));
-    exec0Fetch.releaseBuffers();
-  }
-
-  @Test
-  public void testFetchThreeSort() throws Exception {
-    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-    FetchResult exec0Fetch = fetchBlocks("exec-0",
-      new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" });
-    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
-      exec0Fetch.successBlocks);
-    assertTrue(exec0Fetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks));
-    exec0Fetch.releaseBuffers();
-  }
-
-  @Test
-  public void testFetchHash() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
-    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks);
-    assertTrue(execFetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks));
-    execFetch.releaseBuffers();
-  }
-
-  @Test
-  public void testFetchWrongShuffle() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchInvalidShuffle() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager"));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchWrongBlockId() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "rdd_1_0_0" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchNonexistent() throws Exception {
-    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-    FetchResult execFetch = fetchBlocks("exec-0",
-      new String[] { "shuffle_2_0_0" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchWrongExecutor() throws Exception {
-    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-    FetchResult execFetch = fetchBlocks("exec-0",
-      new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
-    // Both still fail, as we start by checking for all block.
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchUnregisteredExecutor() throws Exception {
-    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-    FetchResult execFetch = fetchBlocks("exec-2",
-      new String[] { "shuffle_0_0_0", "shuffle_1_0_0" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
-  }
-
-  @Test
-  public void testFetchNoServer() throws Exception {
-    System.setProperty("spark.shuffle.io.maxRetries", "0");
-    try {
-      registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
-      FetchResult execFetch = fetchBlocks("exec-0",
-        new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */);
-      assertTrue(execFetch.successBlocks.isEmpty());
-      assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
-    } finally {
-      System.clearProperty("spark.shuffle.io.maxRetries");
-    }
-  }
-
-  private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
-      throws IOException {
-    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
-    client.init(APP_ID);
-    client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
-      executorId, executorInfo);
-  }
-
-  private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
-    throws Exception {
-    assertEquals(list0.size(), list1.size());
-    for (int i = 0; i < list0.size(); i ++) {
-      assertBuffersEqual(list0.get(i), new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
-    }
-  }
-
-  private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
-    ByteBuffer nio0 = buffer0.nioByteBuffer();
-    ByteBuffer nio1 = buffer1.nioByteBuffer();
-
-    int len = nio0.remaining();
-    assertEquals(nio0.remaining(), nio1.remaining());
-    for (int i = 0; i < len; i ++) {
-      assertEquals(nio0.get(), nio1.get());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
deleted file mode 100644
index 08ddb37..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.sasl.SaslServerBootstrap;
-import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleSecuritySuite {
-
-  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-  TransportServer server;
-
-  @Before
-  public void beforeEach() throws IOException {
-    TransportContext context =
-      new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
-    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
-        new TestSecretKeyHolder("my-app-id", "secret"));
-    this.server = context.createServer(Arrays.asList(bootstrap));
-  }
-
-  @After
-  public void afterEach() {
-    if (server != null) {
-      server.close();
-      server = null;
-    }
-  }
-
-  @Test
-  public void testValid() throws IOException {
-    validate("my-app-id", "secret", false);
-  }
-
-  @Test
-  public void testBadAppId() {
-    try {
-      validate("wrong-app-id", "secret", false);
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
-    }
-  }
-
-  @Test
-  public void testBadSecret() {
-    try {
-      validate("my-app-id", "bad-secret", false);
-    } catch (Exception e) {
-      assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
-    }
-  }
-
-  @Test
-  public void testEncryption() throws IOException {
-    validate("my-app-id", "secret", true);
-  }
-
-  /** Creates an ExternalShuffleClient and attempts to register with the server. */
-  private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
-    ExternalShuffleClient client =
-      new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
-    client.init(appId);
-    // Registration either succeeds or throws an exception.
-    client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
-      new ExecutorShuffleInfo(new String[0], 0, ""));
-    client.close();
-  }
-
-  /** Provides a secret key holder which always returns the given secret key, for a single appId. */
-  static class TestSecretKeyHolder implements SecretKeyHolder {
-    private final String appId;
-    private final String secretKey;
-
-    TestSecretKeyHolder(String appId, String secretKey) {
-      this.appId = appId;
-      this.secretKey = secretKey;
-    }
-
-    @Override
-    public String getSaslUser(String appId) {
-      return "user";
-    }
-
-    @Override
-    public String getSecretKey(String appId) {
-      if (!appId.equals(this.appId)) {
-        throw new IllegalArgumentException("Wrong appId!");
-      }
-      return secretKey;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
deleted file mode 100644
index 2590b9c..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.Maps;
-import io.netty.buffer.Unpooled;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NettyManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.ChunkReceivedCallback;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-
-public class OneForOneBlockFetcherSuite {
-  @Test
-  public void testFetchOne() {
-    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
-    blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
-
-    BlockFetchingListener listener = fetchBlocks(blocks);
-
-    verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
-  }
-
-  @Test
-  public void testFetchThree() {
-    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
-    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
-    blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
-    blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
-
-    BlockFetchingListener listener = fetchBlocks(blocks);
-
-    for (int i = 0; i < 3; i ++) {
-      verify(listener, times(1)).onBlockFetchSuccess("b" + i, blocks.get("b" + i));
-    }
-  }
-
-  @Test
-  public void testFailure() {
-    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
-    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
-    blocks.put("b1", null);
-    blocks.put("b2", null);
-
-    BlockFetchingListener listener = fetchBlocks(blocks);
-
-    // Each failure will cause a failure to be invoked in all remaining block fetches.
-    verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
-    verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any());
-  }
-
-  @Test
-  public void testFailureAndSuccess() {
-    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
-    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
-    blocks.put("b1", null);
-    blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[21])));
-
-    BlockFetchingListener listener = fetchBlocks(blocks);
-
-    // We may call both success and failure for the same block.
-    verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
-    verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any());
-  }
-
-  @Test
-  public void testEmptyBlockFetch() {
-    try {
-      fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap());
-      fail();
-    } catch (IllegalArgumentException e) {
-      assertEquals("Zero-sized blockIds array", e.getMessage());
-    }
-  }
-
-  /**
-   * Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
-   * simply returns the given (BlockId, Block) pairs.
-   * As "blocks" is a LinkedHashMap, the blocks are guaranteed to be returned in the same order
-   * that they were inserted in.
-   *
-   * If a block's buffer is "null", an exception will be thrown instead.
-   */
-  private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) {
-    TransportClient client = mock(TransportClient.class);
-    BlockFetchingListener listener = mock(BlockFetchingListener.class);
-    final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
-    OneForOneBlockFetcher fetcher =
-      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
-
-    // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-        BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(
-          (ByteBuffer) invocationOnMock.getArguments()[0]);
-        RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1];
-        callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
-        assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
-        return null;
-      }
-    }).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
-
-    // Respond to each chunk request with a single buffer from our blocks array.
-    final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
-    final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        try {
-          long streamId = (Long) invocation.getArguments()[0];
-          int myChunkIndex = (Integer) invocation.getArguments()[1];
-          assertEquals(123, streamId);
-          assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
-
-          ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2];
-          ManagedBuffer result = blockIterator.next();
-          if (result != null) {
-            callback.onSuccess(myChunkIndex, result);
-          } else {
-            callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex));
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Unexpected failure");
-        }
-        return null;
-      }
-    }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any());
-
-    fetcher.start();
-    return listener;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org