You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/04/27 19:06:17 UTC
spark git commit: [SPARK-20426] Lazy initialization of
FileSegmentManagedBuffer for shuffle service.
Repository: spark
Updated Branches:
refs/heads/master 561e9cc39 -> 85c6ce619
[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service.
## What changes were proposed in this pull request?
When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service.
## How was this patch tested?
Manually test.
Author: jinxing <ji...@126.com>
Closes #17744 from jinxing64/SPARK-20426.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85c6ce61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85c6ce61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85c6ce61
Branch: refs/heads/master
Commit: 85c6ce61930490e2247fb4b0e22dfebbb8b6a1ee
Parents: 561e9cc
Author: jinxing <ji...@126.com>
Authored: Thu Apr 27 14:06:07 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Thu Apr 27 14:06:07 2017 -0500
----------------------------------------------------------------------
.../shuffle/ExternalShuffleBlockHandler.java | 31 +++++++++++++-------
.../ExternalShuffleBlockHandlerSuite.java | 4 +--
.../ExternalShuffleIntegrationSuite.java | 5 ++--
.../network/netty/NettyBlockRpcServer.scala | 9 +++---
4 files changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6daf960..c0f1da5 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
import com.codahale.metrics.Gauge;
@@ -30,7 +30,6 @@ import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,14 +92,25 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
- List<ManagedBuffer> blocks = Lists.newArrayList();
- long totalBlockSize = 0;
- for (String blockId : msg.blockIds) {
- final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
- totalBlockSize += block != null ? block.size() : 0;
- blocks.add(block);
- }
- long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
+ Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
+ private int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < msg.blockIds.length;
+ }
+
+ @Override
+ public ManagedBuffer next() {
+ final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
+ msg.blockIds[index]);
+ index++;
+ metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
+ return block;
+ }
+ };
+
+ long streamId = streamManager.registerStream(client.getClientId(), iter);
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
@@ -109,7 +119,6 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
getRemoteAddress(client.getChannel()));
}
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
- metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
responseDelayContext.stop();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index e47a72c..4d48b18 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -88,8 +88,6 @@ public class ExternalShuffleBlockHandlerSuite {
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
.toByteBuffer();
handler.receive(client, openBlocks, callback);
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
verify(callback, times(1)).onSuccess(response.capture());
@@ -107,6 +105,8 @@ public class ExternalShuffleBlockHandlerSuite {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index b8ae04e..7a33b68 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -216,9 +216,8 @@ public class ExternalShuffleIntegrationSuite {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
- // Both still fail, as we start by checking for all block.
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
+ assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
+ assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
}
@Test
http://git-wip-us.apache.org/repos/asf/spark/blob/85c6ce61/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 2ed8a00..305fd9a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -56,11 +56,12 @@ class NettyBlockRpcServer(
message match {
case openBlocks: OpenBlocks =>
- val blocks: Seq[ManagedBuffer] =
- openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
+ val blocksNum = openBlocks.blockIds.length
+ val blocks = for (i <- (0 until blocksNum).view)
+ yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
- logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
- responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
+ logTrace(s"Registered streamId $streamId with $blocksNum buffers")
+ responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org