You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/16 12:09:52 UTC
spark git commit: [SPARK-20994] Remove redundant characters in
OpenBlocks to save memory for shuffle service.
Repository: spark
Updated Branches:
refs/heads/master 45824fb60 -> 93dd0c518
[SPARK-20994] Remove redundant characters in OpenBlocks to save memory for shuffle service.
## What changes were proposed in this pull request?
In current code, blockIds in `OpenBlocks` are stored in the iterator on shuffle service.
There are some redundant characters in blockId(`"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId`). This pr proposes to improve the footprint and alleviate the memory pressure on shuffle service.
Author: jinxing <ji...@126.com>
Closes #18231 from jinxing64/SPARK-20994-v2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93dd0c51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93dd0c51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93dd0c51
Branch: refs/heads/master
Commit: 93dd0c518d040155b04e5ab258c5835aec7776fc
Parents: 45824fb
Author: jinxing <ji...@126.com>
Authored: Fri Jun 16 20:09:45 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jun 16 20:09:45 2017 +0800
----------------------------------------------------------------------
.../shuffle/ExternalShuffleBlockHandler.java | 70 ++++++++++++++------
.../shuffle/ExternalShuffleBlockResolver.java | 23 +++----
.../network/sasl/SaslIntegrationSuite.java | 2 +-
.../ExternalShuffleBlockHandlerSuite.java | 11 +--
.../ExternalShuffleBlockResolverSuite.java | 10 +--
.../ExternalShuffleIntegrationSuite.java | 8 +--
6 files changed, 73 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index c0f1da5..fc7bba4 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -44,7 +44,6 @@ import org.apache.spark.network.shuffle.protocol.*;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;
-
/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
*
@@ -91,26 +90,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
try {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
-
- Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
- private int index = 0;
-
- @Override
- public boolean hasNext() {
- return index < msg.blockIds.length;
- }
-
- @Override
- public ManagedBuffer next() {
- final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
- msg.blockIds[index]);
- index++;
- metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
- return block;
- }
- };
-
- long streamId = streamManager.registerStream(client.getClientId(), iter);
+ long streamId = streamManager.registerStream(client.getClientId(),
+ new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds));
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
@@ -209,4 +190,51 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
}
}
+ private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
+
+ private int index = 0;
+ private final String appId;
+ private final String execId;
+ private final int shuffleId;
+ // An array containing mapId and reduceId pairs.
+ private final int[] mapIdAndReduceIds;
+
+ ManagedBufferIterator(String appId, String execId, String[] blockIds) {
+ this.appId = appId;
+ this.execId = execId;
+ String[] blockId0Parts = blockIds[0].split("_");
+ if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
+ throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
+ }
+ this.shuffleId = Integer.parseInt(blockId0Parts[1]);
+ mapIdAndReduceIds = new int[2 * blockIds.length];
+ for (int i = 0; i < blockIds.length; i++) {
+ String[] blockIdParts = blockIds[i].split("_");
+ if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
+ throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
+ }
+ if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
+ throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
+ ", got:" + blockIds[i]);
+ }
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < mapIdAndReduceIds.length;
+ }
+
+ @Override
+ public ManagedBuffer next() {
+ final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
+ mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
+ index += 2;
+ metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
+ return block;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 62d58ab..d7ec0e2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -150,27 +150,20 @@ public class ExternalShuffleBlockResolver {
}
/**
- * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
- * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
- * assumptions about how the hash and sort based shuffles store their data.
+ * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
+ * about how the hash and sort based shuffles store their data.
*/
- public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
- String[] blockIdParts = blockId.split("_");
- if (blockIdParts.length < 4) {
- throw new IllegalArgumentException("Unexpected block id format: " + blockId);
- } else if (!blockIdParts[0].equals("shuffle")) {
- throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
- }
- int shuffleId = Integer.parseInt(blockIdParts[1]);
- int mapId = Integer.parseInt(blockIdParts[2]);
- int reduceId = Integer.parseInt(blockIdParts[3]);
-
+ public ManagedBuffer getBlockData(
+ String appId,
+ String execId,
+ int shuffleId,
+ int mapId,
+ int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0c054fc..8110f1e 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -202,7 +202,7 @@ public class SaslIntegrationSuite {
}
};
- String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
+ String[] blockIds = { "shuffle_0_1_2", "shuffle_0_3_4" };
OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null);
fetcher.start();
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 4d48b18..7846b71 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -83,9 +83,10 @@ public class ExternalShuffleBlockHandlerSuite {
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
- when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
- ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
+ when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
+ when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
+ ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
+ new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
handler.receive(client, openBlocks, callback);
@@ -105,8 +106,8 @@ public class ExternalShuffleBlockHandlerSuite {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index bc97594..23438a0 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -65,7 +65,7 @@ public class ExternalShuffleBlockResolverSuite {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
- resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec1", 1, 1, 0);
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
@@ -74,7 +74,7 @@ public class ExternalShuffleBlockResolverSuite {
// Invalid shuffle manager
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
- resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec2", 1, 1, 0);
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
@@ -84,7 +84,7 @@ public class ExternalShuffleBlockResolverSuite {
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo(SORT_MANAGER));
try {
- resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec3", 1, 1, 0);
fail("Should have failed");
} catch (Exception e) {
// pass
@@ -98,14 +98,14 @@ public class ExternalShuffleBlockResolverSuite {
dataContext.createExecutorInfo(SORT_MANAGER));
InputStream block0Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
+ resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);
InputStream block1Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
+ resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
http://git-wip-us.apache.org/repos/asf/spark/blob/93dd0c51/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index d1d8f5b..4391e30 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -214,10 +214,10 @@ public class ExternalShuffleIntegrationSuite {
@Test
public void testFetchWrongExecutor() throws Exception {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-0",
- new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
- assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
- assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
+ FetchResult execFetch0 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" /* right */});
+ FetchResult execFetch1 = fetchBlocks("exec-0", new String[] { "shuffle_1_0_0" /* wrong */ });
+ assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch0.successBlocks);
+ assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch1.failedBlocks);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org