You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:18 UTC
[02/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules
into common/network-*
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
deleted file mode 100644
index 6758203..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.mesos;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.shuffle.ExternalShuffleClient;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * A client for talking to the external shuffle service in Mesos coarse-grained mode.
- *
- * This is used by the Spark driver to register with each external shuffle service on the cluster.
- * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
- * after the application exits. Mesos does not provide a great alternative to do this, so Spark
- * has to detect this itself.
- */
-public class MesosExternalShuffleClient extends ExternalShuffleClient {
- private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
-
- /**
- * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
- * Please refer to docs on {@link ExternalShuffleClient} for more information.
- */
- public MesosExternalShuffleClient(
- TransportConf conf,
- SecretKeyHolder secretKeyHolder,
- boolean saslEnabled,
- boolean saslEncryptionEnabled) {
- super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
- }
-
- public void registerDriverWithShuffleService(String host, int port) throws IOException {
- checkInit();
- ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
- TransportClient client = clientFactory.createClient(host, port);
- client.sendRpc(registerDriver, new RpcResponseCallback() {
- @Override
- public void onSuccess(ByteBuffer response) {
- logger.info("Successfully registered app " + appId + " with external shuffle service.");
- }
-
- @Override
- public void onFailure(Throwable e) {
- logger.warn("Unable to register app " + appId + " with external shuffle service. " +
- "Please manually remove shuffle data after driver exit. Error: " + e);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
deleted file mode 100644
index 7fbe338..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.nio.ByteBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-
-/**
- * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
- * by Spark's NettyBlockTransferService.
- *
- * At a high level:
- * - OpenBlock is handled by both services, but only services shuffle files for the external
- * shuffle service. It returns a StreamHandle.
- * - UploadBlock is only handled by the NettyBlockTransferService.
- * - RegisterExecutor is only handled by the external shuffle service.
- */
-public abstract class BlockTransferMessage implements Encodable {
- protected abstract Type type();
-
- /** Preceding every serialized message is its type, which allows us to deserialize it. */
- public static enum Type {
- OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
-
- private final byte id;
-
- private Type(int id) {
- assert id < 128 : "Cannot have more than 128 message types";
- this.id = (byte) id;
- }
-
- public byte id() { return id; }
- }
-
- // NB: Java does not support static methods in interfaces, so we must put this in a static class.
- public static class Decoder {
- /** Deserializes the 'type' byte followed by the message itself. */
- public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
- ByteBuf buf = Unpooled.wrappedBuffer(msg);
- byte type = buf.readByte();
- switch (type) {
- case 0: return OpenBlocks.decode(buf);
- case 1: return UploadBlock.decode(buf);
- case 2: return RegisterExecutor.decode(buf);
- case 3: return StreamHandle.decode(buf);
- case 4: return RegisterDriver.decode(buf);
- default: throw new IllegalArgumentException("Unknown message type: " + type);
- }
- }
- }
-
- /** Serializes the 'type' byte followed by the message itself. */
- public ByteBuffer toByteBuffer() {
- // Allow room for encoded message, plus the type byte
- ByteBuf buf = Unpooled.buffer(encodedLength() + 1);
- buf.writeByte(type().id);
- encode(buf);
- assert buf.writableBytes() == 0 : "Writable bytes remain: " + buf.writableBytes();
- return buf.nioBuffer();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
deleted file mode 100644
index 102d4ef..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.protocol.Encoders;
-
-/** Contains all configuration necessary for locating the shuffle files of an executor. */
-public class ExecutorShuffleInfo implements Encodable {
- /** The base set of local directories that the executor stores its shuffle files in. */
- public final String[] localDirs;
- /** Number of subdirectories created within each localDir. */
- public final int subDirsPerLocalDir;
- /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
- public final String shuffleManager;
-
- @JsonCreator
- public ExecutorShuffleInfo(
- @JsonProperty("localDirs") String[] localDirs,
- @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
- @JsonProperty("shuffleManager") String shuffleManager) {
- this.localDirs = localDirs;
- this.subDirsPerLocalDir = subDirsPerLocalDir;
- this.shuffleManager = shuffleManager;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("localDirs", Arrays.toString(localDirs))
- .add("subDirsPerLocalDir", subDirsPerLocalDir)
- .add("shuffleManager", shuffleManager)
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof ExecutorShuffleInfo) {
- ExecutorShuffleInfo o = (ExecutorShuffleInfo) other;
- return Arrays.equals(localDirs, o.localDirs)
- && Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir)
- && Objects.equal(shuffleManager, o.shuffleManager);
- }
- return false;
- }
-
- @Override
- public int encodedLength() {
- return Encoders.StringArrays.encodedLength(localDirs)
- + 4 // int
- + Encoders.Strings.encodedLength(shuffleManager);
- }
-
- @Override
- public void encode(ByteBuf buf) {
- Encoders.StringArrays.encode(buf, localDirs);
- buf.writeInt(subDirsPerLocalDir);
- Encoders.Strings.encode(buf, shuffleManager);
- }
-
- public static ExecutorShuffleInfo decode(ByteBuf buf) {
- String[] localDirs = Encoders.StringArrays.decode(buf);
- int subDirsPerLocalDir = buf.readInt();
- String shuffleManager = Encoders.Strings.decode(buf);
- return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
deleted file mode 100644
index ce954b8..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/** Request to read a set of blocks. Returns {@link StreamHandle}. */
-public class OpenBlocks extends BlockTransferMessage {
- public final String appId;
- public final String execId;
- public final String[] blockIds;
-
- public OpenBlocks(String appId, String execId, String[] blockIds) {
- this.appId = appId;
- this.execId = execId;
- this.blockIds = blockIds;
- }
-
- @Override
- protected Type type() { return Type.OPEN_BLOCKS; }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("appId", appId)
- .add("execId", execId)
- .add("blockIds", Arrays.toString(blockIds))
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof OpenBlocks) {
- OpenBlocks o = (OpenBlocks) other;
- return Objects.equal(appId, o.appId)
- && Objects.equal(execId, o.execId)
- && Arrays.equals(blockIds, o.blockIds);
- }
- return false;
- }
-
- @Override
- public int encodedLength() {
- return Encoders.Strings.encodedLength(appId)
- + Encoders.Strings.encodedLength(execId)
- + Encoders.StringArrays.encodedLength(blockIds);
- }
-
- @Override
- public void encode(ByteBuf buf) {
- Encoders.Strings.encode(buf, appId);
- Encoders.Strings.encode(buf, execId);
- Encoders.StringArrays.encode(buf, blockIds);
- }
-
- public static OpenBlocks decode(ByteBuf buf) {
- String appId = Encoders.Strings.decode(buf);
- String execId = Encoders.Strings.decode(buf);
- String[] blockIds = Encoders.StringArrays.decode(buf);
- return new OpenBlocks(appId, execId, blockIds);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
deleted file mode 100644
index 167ef33..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * Initial registration message between an executor and its local shuffle server.
- * Returns nothing (empty byte array).
- */
-public class RegisterExecutor extends BlockTransferMessage {
- public final String appId;
- public final String execId;
- public final ExecutorShuffleInfo executorInfo;
-
- public RegisterExecutor(
- String appId,
- String execId,
- ExecutorShuffleInfo executorInfo) {
- this.appId = appId;
- this.execId = execId;
- this.executorInfo = executorInfo;
- }
-
- @Override
- protected Type type() { return Type.REGISTER_EXECUTOR; }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(appId, execId, executorInfo);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("appId", appId)
- .add("execId", execId)
- .add("executorInfo", executorInfo)
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof RegisterExecutor) {
- RegisterExecutor o = (RegisterExecutor) other;
- return Objects.equal(appId, o.appId)
- && Objects.equal(execId, o.execId)
- && Objects.equal(executorInfo, o.executorInfo);
- }
- return false;
- }
-
- @Override
- public int encodedLength() {
- return Encoders.Strings.encodedLength(appId)
- + Encoders.Strings.encodedLength(execId)
- + executorInfo.encodedLength();
- }
-
- @Override
- public void encode(ByteBuf buf) {
- Encoders.Strings.encode(buf, appId);
- Encoders.Strings.encode(buf, execId);
- executorInfo.encode(buf);
- }
-
- public static RegisterExecutor decode(ByteBuf buf) {
- String appId = Encoders.Strings.decode(buf);
- String execId = Encoders.Strings.decode(buf);
- ExecutorShuffleInfo executorShuffleInfo = ExecutorShuffleInfo.decode(buf);
- return new RegisterExecutor(appId, execId, executorShuffleInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
deleted file mode 100644
index 1915295..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * Identifier for a fixed number of chunks to read from a stream created by an "open blocks"
- * message. This is used by {@link org.apache.spark.network.shuffle.OneForOneBlockFetcher}.
- */
-public class StreamHandle extends BlockTransferMessage {
- public final long streamId;
- public final int numChunks;
-
- public StreamHandle(long streamId, int numChunks) {
- this.streamId = streamId;
- this.numChunks = numChunks;
- }
-
- @Override
- protected Type type() { return Type.STREAM_HANDLE; }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(streamId, numChunks);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("streamId", streamId)
- .add("numChunks", numChunks)
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof StreamHandle) {
- StreamHandle o = (StreamHandle) other;
- return Objects.equal(streamId, o.streamId)
- && Objects.equal(numChunks, o.numChunks);
- }
- return false;
- }
-
- @Override
- public int encodedLength() {
- return 8 + 4;
- }
-
- @Override
- public void encode(ByteBuf buf) {
- buf.writeLong(streamId);
- buf.writeInt(numChunks);
- }
-
- public static StreamHandle decode(ByteBuf buf) {
- long streamId = buf.readLong();
- int numChunks = buf.readInt();
- return new StreamHandle(streamId, numChunks);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
deleted file mode 100644
index 3caed59..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-
-/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
-public class UploadBlock extends BlockTransferMessage {
- public final String appId;
- public final String execId;
- public final String blockId;
- // TODO: StorageLevel is serialized separately in here because StorageLevel is not available in
- // this package. We should avoid this hack.
- public final byte[] metadata;
- public final byte[] blockData;
-
- /**
- * @param metadata Meta-information about block, typically StorageLevel.
- * @param blockData The actual block's bytes.
- */
- public UploadBlock(
- String appId,
- String execId,
- String blockId,
- byte[] metadata,
- byte[] blockData) {
- this.appId = appId;
- this.execId = execId;
- this.blockId = blockId;
- this.metadata = metadata;
- this.blockData = blockData;
- }
-
- @Override
- protected Type type() { return Type.UPLOAD_BLOCK; }
-
- @Override
- public int hashCode() {
- int objectsHashCode = Objects.hashCode(appId, execId, blockId);
- return (objectsHashCode * 41 + Arrays.hashCode(metadata)) * 41 + Arrays.hashCode(blockData);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("appId", appId)
- .add("execId", execId)
- .add("blockId", blockId)
- .add("metadata size", metadata.length)
- .add("block size", blockData.length)
- .toString();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof UploadBlock) {
- UploadBlock o = (UploadBlock) other;
- return Objects.equal(appId, o.appId)
- && Objects.equal(execId, o.execId)
- && Objects.equal(blockId, o.blockId)
- && Arrays.equals(metadata, o.metadata)
- && Arrays.equals(blockData, o.blockData);
- }
- return false;
- }
-
- @Override
- public int encodedLength() {
- return Encoders.Strings.encodedLength(appId)
- + Encoders.Strings.encodedLength(execId)
- + Encoders.Strings.encodedLength(blockId)
- + Encoders.ByteArrays.encodedLength(metadata)
- + Encoders.ByteArrays.encodedLength(blockData);
- }
-
- @Override
- public void encode(ByteBuf buf) {
- Encoders.Strings.encode(buf, appId);
- Encoders.Strings.encode(buf, execId);
- Encoders.Strings.encode(buf, blockId);
- Encoders.ByteArrays.encode(buf, metadata);
- Encoders.ByteArrays.encode(buf, blockData);
- }
-
- public static UploadBlock decode(ByteBuf buf) {
- String appId = Encoders.Strings.decode(buf);
- String execId = Encoders.Strings.decode(buf);
- String blockId = Encoders.Strings.decode(buf);
- byte[] metadata = Encoders.ByteArrays.decode(buf);
- byte[] blockData = Encoders.ByteArrays.decode(buf);
- return new UploadBlock(appId, execId, blockId, metadata, blockData);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
deleted file mode 100644
index 94a61d6..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle.protocol.mesos;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-
-// Needed by ScalaDoc. See SPARK-7726
-import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
-/**
- * A message sent from the driver to register with the MesosExternalShuffleService.
- */
-public class RegisterDriver extends BlockTransferMessage {
- private final String appId;
-
- public RegisterDriver(String appId) {
- this.appId = appId;
- }
-
- public String getAppId() { return appId; }
-
- @Override
- protected Type type() { return Type.REGISTER_DRIVER; }
-
- @Override
- public int encodedLength() {
- return Encoders.Strings.encodedLength(appId);
- }
-
- @Override
- public void encode(ByteBuf buf) {
- Encoders.Strings.encode(buf, appId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(appId);
- }
-
- public static RegisterDriver decode(ByteBuf buf) {
- String appId = Encoders.Strings.decode(buf);
- return new RegisterDriver(appId);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
deleted file mode 100644
index 0ea631e..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.sasl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.collect.Lists;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.ChunkReceivedCallback;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientBootstrap;
-import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.server.OneForOneStreamManager;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.shuffle.BlockFetchingListener;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
-import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class SaslIntegrationSuite {
-
- // Use a long timeout to account for slow / overloaded build machines. In the normal case,
- // tests should finish way before the timeout expires.
- private static final long TIMEOUT_MS = 10_000;
-
- static TransportServer server;
- static TransportConf conf;
- static TransportContext context;
- static SecretKeyHolder secretKeyHolder;
-
- TransportClientFactory clientFactory;
-
- @BeforeClass
- public static void beforeAll() throws IOException {
- conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
- context = new TransportContext(conf, new TestRpcHandler());
-
- secretKeyHolder = mock(SecretKeyHolder.class);
- when(secretKeyHolder.getSaslUser(eq("app-1"))).thenReturn("app-1");
- when(secretKeyHolder.getSecretKey(eq("app-1"))).thenReturn("app-1");
- when(secretKeyHolder.getSaslUser(eq("app-2"))).thenReturn("app-2");
- when(secretKeyHolder.getSecretKey(eq("app-2"))).thenReturn("app-2");
- when(secretKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
- when(secretKeyHolder.getSecretKey(anyString())).thenReturn("correct-password");
-
- TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
- server = context.createServer(Arrays.asList(bootstrap));
- }
-
-
- @AfterClass
- public static void afterAll() {
- server.close();
- }
-
- @After
- public void afterEach() {
- if (clientFactory != null) {
- clientFactory.close();
- clientFactory = null;
- }
- }
-
- @Test
- public void testGoodClient() throws IOException {
- clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
-
- TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
- String msg = "Hello, World!";
- ByteBuffer resp = client.sendRpcSync(JavaUtils.stringToBytes(msg), TIMEOUT_MS);
- assertEquals(msg, JavaUtils.bytesToString(resp));
- }
-
- @Test
- public void testBadClient() {
- SecretKeyHolder badKeyHolder = mock(SecretKeyHolder.class);
- when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
- when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password");
- clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
-
- try {
- // Bootstrap should fail on startup.
- clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
- fail("Connection should have failed.");
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
- }
- }
-
- @Test
- public void testNoSaslClient() throws IOException {
- clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList());
-
- TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
- try {
- client.sendRpcSync(ByteBuffer.allocate(13), TIMEOUT_MS);
- fail("Should have failed");
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Expected SaslMessage"));
- }
-
- try {
- // Guessing the right tag byte doesn't magically get you in...
- client.sendRpcSync(ByteBuffer.wrap(new byte[] { (byte) 0xEA }), TIMEOUT_MS);
- fail("Should have failed");
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("java.lang.IndexOutOfBoundsException"));
- }
- }
-
- @Test
- public void testNoSaslServer() {
- RpcHandler handler = new TestRpcHandler();
- TransportContext context = new TransportContext(conf, handler);
- clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
- TransportServer server = context.createServer();
- try {
- clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation"));
- } finally {
- server.close();
- }
- }
-
- /**
- * This test is not actually testing SASL behavior, but testing that the shuffle service
- * performs correct authorization checks based on the SASL authentication data.
- */
- @Test
- public void testAppIsolation() throws Exception {
- // Start a new server with the correct RPC handler to serve block data.
- ExternalShuffleBlockResolver blockResolver = mock(ExternalShuffleBlockResolver.class);
- ExternalShuffleBlockHandler blockHandler = new ExternalShuffleBlockHandler(
- new OneForOneStreamManager(), blockResolver);
- TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
- TransportContext blockServerContext = new TransportContext(conf, blockHandler);
- TransportServer blockServer = blockServerContext.createServer(Arrays.asList(bootstrap));
-
- TransportClient client1 = null;
- TransportClient client2 = null;
- TransportClientFactory clientFactory2 = null;
- try {
- // Create a client, and make a request to fetch blocks from a different app.
- clientFactory = blockServerContext.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
- client1 = clientFactory.createClient(TestUtils.getLocalHost(),
- blockServer.getPort());
-
- final AtomicReference<Throwable> exception = new AtomicReference<>();
-
- BlockFetchingListener listener = new BlockFetchingListener() {
- @Override
- public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- notifyAll();
- }
-
- @Override
- public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
- exception.set(t);
- notifyAll();
- }
- };
-
- String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
- OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
- blockIds, listener);
- synchronized (listener) {
- fetcher.start();
- listener.wait();
- }
- checkSecurityException(exception.get());
-
- // Register an executor so that the next steps work.
- ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
- new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
- RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
- client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
-
- // Make a successful request to fetch blocks, which creates a new stream. But do not actually
- // fetch any blocks, to keep the stream open.
- OpenBlocks openMessage = new OpenBlocks("app-1", "0", blockIds);
- ByteBuffer response = client1.sendRpcSync(openMessage.toByteBuffer(), TIMEOUT_MS);
- StreamHandle stream = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
- long streamId = stream.streamId;
-
- // Create a second client, authenticated with a different app ID, and try to read from
- // the stream created for the previous app.
- clientFactory2 = blockServerContext.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
- client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
- blockServer.getPort());
-
- ChunkReceivedCallback callback = new ChunkReceivedCallback() {
- @Override
- public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- notifyAll();
- }
-
- @Override
- public synchronized void onFailure(int chunkIndex, Throwable t) {
- exception.set(t);
- notifyAll();
- }
- };
-
- exception.set(null);
- synchronized (callback) {
- client2.fetchChunk(streamId, 0, callback);
- callback.wait();
- }
- checkSecurityException(exception.get());
- } finally {
- if (client1 != null) {
- client1.close();
- }
- if (client2 != null) {
- client2.close();
- }
- if (clientFactory2 != null) {
- clientFactory2.close();
- }
- blockServer.close();
- }
- }
-
- /** RPC handler which simply responds with the message it received. */
- public static class TestRpcHandler extends RpcHandler {
- @Override
- public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
- callback.onSuccess(message);
- }
-
- @Override
- public StreamManager getStreamManager() {
- return new OneForOneStreamManager();
- }
- }
-
- private void checkSecurityException(Throwable t) {
- assertNotNull("No exception was caught.", t);
- assertTrue("Expected SecurityException.",
- t.getMessage().contains(SecurityException.class.getName()));
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
deleted file mode 100644
index 86c8609..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.shuffle.protocol.*;
-
-/** Verifies that all BlockTransferMessages can be serialized correctly. */
-public class BlockTransferMessagesSuite {
- @Test
- public void serializeOpenShuffleBlocks() {
- checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
- checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
- new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
- checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },
- new byte[] { 4, 5, 6, 7} ));
- checkSerializeDeserialize(new StreamHandle(12345, 16));
- }
-
- private void checkSerializeDeserialize(BlockTransferMessage msg) {
- BlockTransferMessage msg2 = BlockTransferMessage.Decoder.fromByteBuffer(msg.toByteBuffer());
- assertEquals(msg, msg2);
- assertEquals(msg.hashCode(), msg2.hashCode());
- assertEquals(msg.toString(), msg2.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
deleted file mode 100644
index 9379412..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.server.OneForOneStreamManager;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-import org.apache.spark.network.shuffle.protocol.UploadBlock;
-
-public class ExternalShuffleBlockHandlerSuite {
- TransportClient client = mock(TransportClient.class);
-
- OneForOneStreamManager streamManager;
- ExternalShuffleBlockResolver blockResolver;
- RpcHandler handler;
-
- @Before
- public void beforeEach() {
- streamManager = mock(OneForOneStreamManager.class);
- blockResolver = mock(ExternalShuffleBlockResolver.class);
- handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
- }
-
- @Test
- public void testRegisterExecutor() {
- RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
- ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
- ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", config).toByteBuffer();
- handler.receive(client, registerMessage, callback);
- verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
-
- verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
- verify(callback, never()).onFailure(any(Throwable.class));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testOpenShuffleBlocks() {
- RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
- ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
- ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
- when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
- ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
- .toByteBuffer();
- handler.receive(client, openBlocks, callback);
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
-
- ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
- verify(callback, times(1)).onSuccess(response.capture());
- verify(callback, never()).onFailure((Throwable) any());
-
- StreamHandle handle =
- (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
- assertEquals(2, handle.numChunks);
-
- @SuppressWarnings("unchecked")
- ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
- (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
- verify(streamManager, times(1)).registerStream(anyString(), stream.capture());
- Iterator<ManagedBuffer> buffers = stream.getValue();
- assertEquals(block0Marker, buffers.next());
- assertEquals(block1Marker, buffers.next());
- assertFalse(buffers.hasNext());
- }
-
- @Test
- public void testBadMessages() {
- RpcResponseCallback callback = mock(RpcResponseCallback.class);
-
- ByteBuffer unserializableMsg = ByteBuffer.wrap(new byte[] { 0x12, 0x34, 0x56 });
- try {
- handler.receive(client, unserializableMsg, callback);
- fail("Should have thrown");
- } catch (Exception e) {
- // pass
- }
-
- ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer();
- try {
- handler.receive(client, unexpectedMsg, callback);
- fail("Should have thrown");
- } catch (UnsupportedOperationException e) {
- // pass
- }
-
- verify(callback, never()).onSuccess(any(ByteBuffer.class));
- verify(callback, never()).onFailure(any(Throwable.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
deleted file mode 100644
index 60a1b8b..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.io.CharStreams;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ExternalShuffleBlockResolverSuite {
- static String sortBlock0 = "Hello!";
- static String sortBlock1 = "World!";
-
- static String hashBlock0 = "Elementary";
- static String hashBlock1 = "Tabular";
-
- static TestShuffleDataContext dataContext;
-
- static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-
- @BeforeClass
- public static void beforeAll() throws IOException {
- dataContext = new TestShuffleDataContext(2, 5);
-
- dataContext.create();
- // Write some sort and hash data.
- dataContext.insertSortShuffleData(0, 0,
- new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
- dataContext.insertHashShuffleData(1, 0,
- new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
- }
-
- @AfterClass
- public static void afterAll() {
- dataContext.cleanup();
- }
-
- @Test
- public void testBadRequests() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
- // Unregistered executor
- try {
- resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (RuntimeException e) {
- assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
- }
-
- // Invalid shuffle manager
- resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
- try {
- resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (UnsupportedOperationException e) {
- // pass
- }
-
- // Nonexistent shuffle block
- resolver.registerExecutor("app0", "exec3",
- dataContext.createExecutorInfo("sort"));
- try {
- resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (Exception e) {
- // pass
- }
- }
-
- @Test
- public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
- resolver.registerExecutor("app0", "exec0",
- dataContext.createExecutorInfo("sort"));
-
- InputStream block0Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
- block0Stream.close();
- assertEquals(sortBlock0, block0);
-
- InputStream block1Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
- block1Stream.close();
- assertEquals(sortBlock1, block1);
- }
-
- @Test
- public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
- resolver.registerExecutor("app0", "exec0",
- dataContext.createExecutorInfo("hash"));
-
- InputStream block0Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
- block0Stream.close();
- assertEquals(hashBlock0, block0);
-
- InputStream block1Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
- block1Stream.close();
- assertEquals(hashBlock1, block1);
- }
-
- @Test
- public void jsonSerializationOfExecutorRegistration() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- AppExecId appId = new AppExecId("foo", "bar");
- String appIdJson = mapper.writeValueAsString(appId);
- AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
- assertEquals(parsedAppId, appId);
-
- ExecutorShuffleInfo shuffleInfo =
- new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
- String shuffleJson = mapper.writeValueAsString(shuffleInfo);
- ExecutorShuffleInfo parsedShuffleInfo =
- mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
- assertEquals(parsedShuffleInfo, shuffleInfo);
-
- // Intentionally keep these hard-coded strings in here, to check backwards-compatability.
- // its not legacy yet, but keeping this here in case anybody changes it
- String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
- assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
- String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
- "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
- assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
deleted file mode 100644
index 532d7ab..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleCleanupSuite {
-
- // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
- Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
- TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
-
- @Test
- public void noCleanupAndCleanup() throws IOException {
- TestShuffleDataContext dataContext = createSomeData();
-
- ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
- resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
- resolver.applicationRemoved("app", false /* cleanup */);
-
- assertStillThere(dataContext);
-
- resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
- resolver.applicationRemoved("app", true /* cleanup */);
-
- assertCleanedUp(dataContext);
- }
-
- @Test
- public void cleanupUsesExecutor() throws IOException {
- TestShuffleDataContext dataContext = createSomeData();
-
- final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
-
- // Executor which does nothing to ensure we're actually using it.
- Executor noThreadExecutor = new Executor() {
- @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
- };
-
- ExternalShuffleBlockResolver manager =
- new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
-
- manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", true);
-
- assertTrue(cleanupCalled.get());
- assertStillThere(dataContext);
-
- dataContext.cleanup();
- assertCleanedUp(dataContext);
- }
-
- @Test
- public void cleanupMultipleExecutors() throws IOException {
- TestShuffleDataContext dataContext0 = createSomeData();
- TestShuffleDataContext dataContext1 = createSomeData();
-
- ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-
- resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
- resolver.applicationRemoved("app", true);
-
- assertCleanedUp(dataContext0);
- assertCleanedUp(dataContext1);
- }
-
- @Test
- public void cleanupOnlyRemovedApp() throws IOException {
- TestShuffleDataContext dataContext0 = createSomeData();
- TestShuffleDataContext dataContext1 = createSomeData();
-
- ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
-
- resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
-
- resolver.applicationRemoved("app-nonexistent", true);
- assertStillThere(dataContext0);
- assertStillThere(dataContext1);
-
- resolver.applicationRemoved("app-0", true);
- assertCleanedUp(dataContext0);
- assertStillThere(dataContext1);
-
- resolver.applicationRemoved("app-1", true);
- assertCleanedUp(dataContext0);
- assertCleanedUp(dataContext1);
-
- // Make sure it's not an error to cleanup multiple times
- resolver.applicationRemoved("app-1", true);
- assertCleanedUp(dataContext0);
- assertCleanedUp(dataContext1);
- }
-
- private void assertStillThere(TestShuffleDataContext dataContext) {
- for (String localDir : dataContext.localDirs) {
- assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
- }
- }
-
- private void assertCleanedUp(TestShuffleDataContext dataContext) {
- for (String localDir : dataContext.localDirs) {
- assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
- }
- }
-
- private TestShuffleDataContext createSomeData() throws IOException {
- Random rand = new Random(123);
- TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
-
- dataContext.create();
- dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
- new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
- dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
- new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
- return dataContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
deleted file mode 100644
index 5e706bf..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleIntegrationSuite {
-
- static String APP_ID = "app-id";
- static String SORT_MANAGER = "sort";
- static String HASH_MANAGER = "hash";
-
- // Executor 0 is sort-based
- static TestShuffleDataContext dataContext0;
- // Executor 1 is hash-based
- static TestShuffleDataContext dataContext1;
-
- static ExternalShuffleBlockHandler handler;
- static TransportServer server;
- static TransportConf conf;
-
- static byte[][] exec0Blocks = new byte[][] {
- new byte[123],
- new byte[12345],
- new byte[1234567],
- };
-
- static byte[][] exec1Blocks = new byte[][] {
- new byte[321],
- new byte[54321],
- };
-
- @BeforeClass
- public static void beforeAll() throws IOException {
- Random rand = new Random();
-
- for (byte[] block : exec0Blocks) {
- rand.nextBytes(block);
- }
- for (byte[] block: exec1Blocks) {
- rand.nextBytes(block);
- }
-
- dataContext0 = new TestShuffleDataContext(2, 5);
- dataContext0.create();
- dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
-
- dataContext1 = new TestShuffleDataContext(6, 2);
- dataContext1.create();
- dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
-
- conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
- handler = new ExternalShuffleBlockHandler(conf, null);
- TransportContext transportContext = new TransportContext(conf, handler);
- server = transportContext.createServer();
- }
-
- @AfterClass
- public static void afterAll() {
- dataContext0.cleanup();
- dataContext1.cleanup();
- server.close();
- }
-
- @After
- public void afterEach() {
- handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
- }
-
- class FetchResult {
- public Set<String> successBlocks;
- public Set<String> failedBlocks;
- public List<ManagedBuffer> buffers;
-
- public void releaseBuffers() {
- for (ManagedBuffer buffer : buffers) {
- buffer.release();
- }
- }
- }
-
- // Fetch a set of blocks from a pre-registered executor.
- private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
- return fetchBlocks(execId, blockIds, server.getPort());
- }
-
- // Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port,
- // to allow connecting to invalid servers.
- private FetchResult fetchBlocks(String execId, String[] blockIds, int port) throws Exception {
- final FetchResult res = new FetchResult();
- res.successBlocks = Collections.synchronizedSet(new HashSet<String>());
- res.failedBlocks = Collections.synchronizedSet(new HashSet<String>());
- res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
-
- final Semaphore requestsRemaining = new Semaphore(0);
-
- ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
- client.init(APP_ID);
- client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
- new BlockFetchingListener() {
- @Override
- public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- synchronized (this) {
- if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
- data.retain();
- res.successBlocks.add(blockId);
- res.buffers.add(data);
- requestsRemaining.release();
- }
- }
- }
-
- @Override
- public void onBlockFetchFailure(String blockId, Throwable exception) {
- synchronized (this) {
- if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
- res.failedBlocks.add(blockId);
- requestsRemaining.release();
- }
- }
- }
- });
-
- if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
- fail("Timeout getting response from the server");
- }
- client.close();
- return res;
- }
-
- @Test
- public void testFetchOneSort() throws Exception {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
- assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
- assertTrue(exec0Fetch.failedBlocks.isEmpty());
- assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0]));
- exec0Fetch.releaseBuffers();
- }
-
- @Test
- public void testFetchThreeSort() throws Exception {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult exec0Fetch = fetchBlocks("exec-0",
- new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" });
- assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
- exec0Fetch.successBlocks);
- assertTrue(exec0Fetch.failedBlocks.isEmpty());
- assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks));
- exec0Fetch.releaseBuffers();
- }
-
- @Test
- public void testFetchHash() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
- assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks);
- assertTrue(execFetch.failedBlocks.isEmpty());
- assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks));
- execFetch.releaseBuffers();
- }
-
- @Test
- public void testFetchWrongShuffle() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchInvalidShuffle() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager"));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchWrongBlockId() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "rdd_1_0_0" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchNonexistent() throws Exception {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-0",
- new String[] { "shuffle_2_0_0" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchWrongExecutor() throws Exception {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-0",
- new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
- // Both still fail, as we start by checking for all block.
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchUnregisteredExecutor() throws Exception {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-2",
- new String[] { "shuffle_0_0_0", "shuffle_1_0_0" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
- }
-
- @Test
- public void testFetchNoServer() throws Exception {
- System.setProperty("spark.shuffle.io.maxRetries", "0");
- try {
- registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-0",
- new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */);
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
- } finally {
- System.clearProperty("spark.shuffle.io.maxRetries");
- }
- }
-
- private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
- throws IOException {
- ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
- client.init(APP_ID);
- client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
- executorId, executorInfo);
- }
-
- private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
- throws Exception {
- assertEquals(list0.size(), list1.size());
- for (int i = 0; i < list0.size(); i ++) {
- assertBuffersEqual(list0.get(i), new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
- }
- }
-
- private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
- ByteBuffer nio0 = buffer0.nioByteBuffer();
- ByteBuffer nio1 = buffer1.nioByteBuffer();
-
- int len = nio0.remaining();
- assertEquals(nio0.remaining(), nio1.remaining());
- for (int i = 0; i < len; i ++) {
- assertEquals(nio0.get(), nio1.get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
deleted file mode 100644
index 08ddb37..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.spark.network.TestUtils;
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.sasl.SaslServerBootstrap;
-import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-
-public class ExternalShuffleSecuritySuite {
-
- TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
- TransportServer server;
-
- @Before
- public void beforeEach() throws IOException {
- TransportContext context =
- new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
- TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
- new TestSecretKeyHolder("my-app-id", "secret"));
- this.server = context.createServer(Arrays.asList(bootstrap));
- }
-
- @After
- public void afterEach() {
- if (server != null) {
- server.close();
- server = null;
- }
- }
-
- @Test
- public void testValid() throws IOException {
- validate("my-app-id", "secret", false);
- }
-
- @Test
- public void testBadAppId() {
- try {
- validate("wrong-app-id", "secret", false);
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
- }
- }
-
- @Test
- public void testBadSecret() {
- try {
- validate("my-app-id", "bad-secret", false);
- } catch (Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
- }
- }
-
- @Test
- public void testEncryption() throws IOException {
- validate("my-app-id", "secret", true);
- }
-
- /** Creates an ExternalShuffleClient and attempts to register with the server. */
- private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
- ExternalShuffleClient client =
- new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
- client.init(appId);
- // Registration either succeeds or throws an exception.
- client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
- new ExecutorShuffleInfo(new String[0], 0, ""));
- client.close();
- }
-
- /** Provides a secret key holder which always returns the given secret key, for a single appId. */
- static class TestSecretKeyHolder implements SecretKeyHolder {
- private final String appId;
- private final String secretKey;
-
- TestSecretKeyHolder(String appId, String secretKey) {
- this.appId = appId;
- this.secretKey = secretKey;
- }
-
- @Override
- public String getSaslUser(String appId) {
- return "user";
- }
-
- @Override
- public String getSecretKey(String appId) {
- if (!appId.equals(this.appId)) {
- throw new IllegalArgumentException("Wrong appId!");
- }
- return secretKey;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
deleted file mode 100644
index 2590b9c..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.Maps;
-import io.netty.buffer.Unpooled;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NettyManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.ChunkReceivedCallback;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
-
-public class OneForOneBlockFetcherSuite {
- @Test
- public void testFetchOne() {
- LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
- blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
-
- BlockFetchingListener listener = fetchBlocks(blocks);
-
- verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
- }
-
- @Test
- public void testFetchThree() {
- LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
- blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
- blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
- blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
-
- BlockFetchingListener listener = fetchBlocks(blocks);
-
- for (int i = 0; i < 3; i ++) {
- verify(listener, times(1)).onBlockFetchSuccess("b" + i, blocks.get("b" + i));
- }
- }
-
- @Test
- public void testFailure() {
- LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
- blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
- blocks.put("b1", null);
- blocks.put("b2", null);
-
- BlockFetchingListener listener = fetchBlocks(blocks);
-
- // Each failure will cause a failure to be invoked in all remaining block fetches.
- verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
- verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any());
- }
-
- @Test
- public void testFailureAndSuccess() {
- LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
- blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
- blocks.put("b1", null);
- blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[21])));
-
- BlockFetchingListener listener = fetchBlocks(blocks);
-
- // We may call both success and failure for the same block.
- verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
- verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any());
- }
-
- @Test
- public void testEmptyBlockFetch() {
- try {
- fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap());
- fail();
- } catch (IllegalArgumentException e) {
- assertEquals("Zero-sized blockIds array", e.getMessage());
- }
- }
-
- /**
- * Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
- * simply returns the given (BlockId, Block) pairs.
- * As "blocks" is a LinkedHashMap, the blocks are guaranteed to be returned in the same order
- * that they were inserted in.
- *
- * If a block's buffer is "null", an exception will be thrown instead.
- */
- private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) {
- TransportClient client = mock(TransportClient.class);
- BlockFetchingListener listener = mock(BlockFetchingListener.class);
- final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
- OneForOneBlockFetcher fetcher =
- new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
-
- // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(
- (ByteBuffer) invocationOnMock.getArguments()[0]);
- RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1];
- callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
- assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
- return null;
- }
- }).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
-
- // Respond to each chunk request with a single buffer from our blocks array.
- final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
- final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- try {
- long streamId = (Long) invocation.getArguments()[0];
- int myChunkIndex = (Integer) invocation.getArguments()[1];
- assertEquals(123, streamId);
- assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
-
- ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2];
- ManagedBuffer result = blockIterator.next();
- if (result != null) {
- callback.onSuccess(myChunkIndex, result);
- } else {
- callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex));
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail("Unexpected failure");
- }
- return null;
- }
- }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any());
-
- fetcher.start();
- return listener;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org