You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/11 03:26:22 UTC
spark git commit: [SPARK-21369][CORE] Don't use Scala Tuple2 in
common/network-*
Repository: spark
Updated Branches:
refs/heads/master 1471ee7af -> 833eab2c9
[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*
## What changes were proposed in this pull request?
Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.
## How was this patch tested?
Jenkins.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #18593 from zsxwing/SPARK-21369.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/833eab2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/833eab2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/833eab2c
Branch: refs/heads/master
Commit: 833eab2c9bd273ee9577fbf9e480d3e3a4b7d203
Parents: 1471ee7
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Jul 11 11:26:17 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 11 11:26:17 2017 +0800
----------------------------------------------------------------------
common/network-common/pom.xml | 3 ++-
.../client/TransportResponseHandler.java | 20 ++++++++++----------
.../network/server/OneForOneStreamManager.java | 17 +++++------------
common/network-shuffle/pom.xml | 1 +
common/network-yarn/pom.xml | 1 +
5 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 066970f..0254d0c 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -90,7 +90,8 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
- </dependency>
+ <scope>test</scope>
+ </dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 340b8b9..7a3d96c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
-import scala.Tuple2;
-
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
private final Map<Long, RpcResponseCallback> outstandingRpcs;
- private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
+ private final Queue<Pair<String, StreamCallback>> streamCallbacks;
private volatile boolean streamActive;
/** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
@@ -92,7 +92,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
public void addStreamCallback(String streamId, StreamCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
- streamCallbacks.offer(new Tuple2<>(streamId, callback));
+ streamCallbacks.offer(ImmutablePair.of(streamId, callback));
}
@VisibleForTesting
@@ -119,9 +119,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
logger.warn("RpcResponseCallback.onFailure throws exception", e);
}
}
- for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
+ for (Pair<String, StreamCallback> entry : streamCallbacks) {
try {
- entry._2().onFailure(entry._1(), cause);
+ entry.getValue().onFailure(entry.getKey(), cause);
} catch (Exception e) {
logger.warn("StreamCallback.onFailure throws exception", e);
}
@@ -208,9 +208,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
} else if (message instanceof StreamResponse) {
StreamResponse resp = (StreamResponse) message;
- Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
+ Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
- StreamCallback callback = entry._2();
+ StreamCallback callback = entry.getValue();
if (resp.byteCount > 0) {
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
callback);
@@ -235,9 +235,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
} else if (message instanceof StreamFailure) {
StreamFailure resp = (StreamFailure) message;
- Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
+ Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
- StreamCallback callback = entry._2();
+ StreamCallback callback = entry.getValue();
try {
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index ad8e8b4..85ca2f1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -23,8 +23,6 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import scala.Tuple2;
-
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.slf4j.Logger;
@@ -98,21 +96,16 @@ public class OneForOneStreamManager extends StreamManager {
@Override
public ManagedBuffer openStream(String streamChunkId) {
- Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
- return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
- }
-
- public static String genStreamChunkId(long streamId, int chunkId) {
- return String.format("%d_%d", streamId, chunkId);
- }
-
- public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
String[] array = streamChunkId.split("_");
assert array.length == 2:
"Stream id and chunk index should be specified when open stream for fetching block.";
long streamId = Long.valueOf(array[0]);
int chunkIndex = Integer.valueOf(array[1]);
- return new Tuple2<>(streamId, chunkIndex);
+ return getChunk(streamId, chunkIndex);
+ }
+
+ public static String genStreamChunkId(long streamId, int chunkId) {
+ return String.format("%d_%d", streamId, chunkId);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-shuffle/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 2de882a..9968480 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -69,6 +69,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
+ <scope>test</scope>
</dependency>
<!--
http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 7e24315..ec2db6e 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -48,6 +48,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
+ <scope>test</scope>
</dependency>
<!--
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org