You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/11 03:26:22 UTC

spark git commit: [SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

Repository: spark
Updated Branches:
  refs/heads/master 1471ee7af -> 833eab2c9


[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

## What changes were proposed in this pull request?

Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #18593 from zsxwing/SPARK-21369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/833eab2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/833eab2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/833eab2c

Branch: refs/heads/master
Commit: 833eab2c9bd273ee9577fbf9e480d3e3a4b7d203
Parents: 1471ee7
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Jul 11 11:26:17 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 11 11:26:17 2017 +0800

----------------------------------------------------------------------
 common/network-common/pom.xml                   |  3 ++-
 .../client/TransportResponseHandler.java        | 20 ++++++++++----------
 .../network/server/OneForOneStreamManager.java  | 17 +++++------------
 common/network-shuffle/pom.xml                  |  1 +
 common/network-yarn/pom.xml                     |  1 +
 5 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 066970f..0254d0c 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -90,7 +90,8 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
-    </dependency>
+      <scope>test</scope>
+   </dependency>
 
     <!--
       This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 340b8b9..7a3d96c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -24,10 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import scala.Tuple2;
-
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.Channel;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
 
   private final Map<Long, RpcResponseCallback> outstandingRpcs;
 
-  private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
+  private final Queue<Pair<String, StreamCallback>> streamCallbacks;
   private volatile boolean streamActive;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
@@ -92,7 +92,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
 
   public void addStreamCallback(String streamId, StreamCallback callback) {
     timeOfLastRequestNs.set(System.nanoTime());
-    streamCallbacks.offer(new Tuple2<>(streamId, callback));
+    streamCallbacks.offer(ImmutablePair.of(streamId, callback));
   }
 
   @VisibleForTesting
@@ -119,9 +119,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
         logger.warn("RpcResponseCallback.onFailure throws exception", e);
       }
     }
-    for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
+    for (Pair<String, StreamCallback> entry : streamCallbacks) {
       try {
-        entry._2().onFailure(entry._1(), cause);
+        entry.getValue().onFailure(entry.getKey(), cause);
       } catch (Exception e) {
         logger.warn("StreamCallback.onFailure throws exception", e);
       }
@@ -208,9 +208,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
       }
     } else if (message instanceof StreamResponse) {
       StreamResponse resp = (StreamResponse) message;
-      Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
+      Pair<String, StreamCallback> entry = streamCallbacks.poll();
       if (entry != null) {
-        StreamCallback callback = entry._2();
+        StreamCallback callback = entry.getValue();
         if (resp.byteCount > 0) {
           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
             callback);
@@ -235,9 +235,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
       }
     } else if (message instanceof StreamFailure) {
       StreamFailure resp = (StreamFailure) message;
-      Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
+      Pair<String, StreamCallback> entry = streamCallbacks.poll();
       if (entry != null) {
-        StreamCallback callback = entry._2();
+        StreamCallback callback = entry.getValue();
         try {
           callback.onFailure(resp.streamId, new RuntimeException(resp.error));
         } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index ad8e8b4..85ca2f1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -23,8 +23,6 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import scala.Tuple2;
-
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
@@ -98,21 +96,16 @@ public class OneForOneStreamManager extends StreamManager {
 
   @Override
   public ManagedBuffer openStream(String streamChunkId) {
-    Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
-    return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
-  }
-
-  public static String genStreamChunkId(long streamId, int chunkId) {
-    return String.format("%d_%d", streamId, chunkId);
-  }
-
-  public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
     String[] array = streamChunkId.split("_");
     assert array.length == 2:
       "Stream id and chunk index should be specified when open stream for fetching block.";
     long streamId = Long.valueOf(array[0]);
     int chunkIndex = Integer.valueOf(array[1]);
-    return new Tuple2<>(streamId, chunkIndex);
+    return getChunk(streamId, chunkIndex);
+  }
+
+  public static String genStreamChunkId(long streamId, int chunkId) {
+    return String.format("%d_%d", streamId, chunkId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-shuffle/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 2de882a..9968480 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -69,6 +69,7 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
+      <scope>test</scope>
     </dependency>
 
     <!--

http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 7e24315..ec2db6e 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -48,6 +48,7 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
+      <scope>test</scope>
     </dependency>
 
     <!--


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org