You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/04/27 19:06:17 UTC

spark git commit: [SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service.

Repository: spark
Updated Branches:
  refs/heads/master 561e9cc39 -> 85c6ce619


[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service.

## What changes were proposed in this pull request?
When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service.

## How was this patch tested?

Manually test.

Author: jinxing <ji...@126.com>

Closes #17744 from jinxing64/SPARK-20426.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85c6ce61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85c6ce61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85c6ce61

Branch: refs/heads/master
Commit: 85c6ce61930490e2247fb4b0e22dfebbb8b6a1ee
Parents: 561e9cc
Author: jinxing <ji...@126.com>
Authored: Thu Apr 27 14:06:07 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Thu Apr 27 14:06:07 2017 -0500

----------------------------------------------------------------------
 .../shuffle/ExternalShuffleBlockHandler.java    | 31 +++++++++++++-------
 .../ExternalShuffleBlockHandlerSuite.java       |  4 +--
 .../ExternalShuffleIntegrationSuite.java        |  5 ++--
 .../network/netty/NettyBlockRpcServer.scala     |  9 +++---
 4 files changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6daf960..c0f1da5 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
 
 import com.codahale.metrics.Gauge;
@@ -30,7 +30,6 @@ import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,14 +92,25 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
         OpenBlocks msg = (OpenBlocks) msgObj;
         checkAuth(client, msg.appId);
 
-        List<ManagedBuffer> blocks = Lists.newArrayList();
-        long totalBlockSize = 0;
-        for (String blockId : msg.blockIds) {
-          final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
-          totalBlockSize += block != null ? block.size() : 0;
-          blocks.add(block);
-        }
-        long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
+        Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
+          private int index = 0;
+
+          @Override
+          public boolean hasNext() {
+            return index < msg.blockIds.length;
+          }
+
+          @Override
+          public ManagedBuffer next() {
+            final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
+              msg.blockIds[index]);
+            index++;
+            metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
+            return block;
+          }
+        };
+
+        long streamId = streamManager.registerStream(client.getClientId(), iter);
         if (logger.isTraceEnabled()) {
           logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
                        streamId,
@@ -109,7 +119,6 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
                        getRemoteAddress(client.getChannel()));
         }
         callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
-        metrics.blockTransferRateBytes.mark(totalBlockSize);
       } finally {
         responseDelayContext.stop();
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index e47a72c..4d48b18 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -88,8 +88,6 @@ public class ExternalShuffleBlockHandlerSuite {
     ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
       .toByteBuffer();
     handler.receive(client, openBlocks, callback);
-    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
-    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
 
     ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
     verify(callback, times(1)).onSuccess(response.capture());
@@ -107,6 +105,8 @@ public class ExternalShuffleBlockHandlerSuite {
     assertEquals(block0Marker, buffers.next());
     assertEquals(block1Marker, buffers.next());
     assertFalse(buffers.hasNext());
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
 
     // Verify open block request latency metrics
     Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index b8ae04e..7a33b68 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -216,9 +216,8 @@ public class ExternalShuffleIntegrationSuite {
     registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
     FetchResult execFetch = fetchBlocks("exec-0",
       new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
-    // Both still fail, as we start by checking for all block.
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
+    assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
+    assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 2ed8a00..305fd9a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -56,11 +56,12 @@ class NettyBlockRpcServer(
 
     message match {
       case openBlocks: OpenBlocks =>
-        val blocks: Seq[ManagedBuffer] =
-          openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
+        val blocksNum = openBlocks.blockIds.length
+        val blocks = for (i <- (0 until blocksNum).view)
+          yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
         val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
-        logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
-        responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
+        logTrace(s"Registered streamId $streamId with $blocksNum buffers")
+        responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
 
       case uploadBlock: UploadBlock =>
         // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org