You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/25 12:52:20 UTC
spark git commit: [SPARK-21175] Reject OpenBlocks when memory
shortage on shuffle service.
Repository: spark
Updated Branches:
refs/heads/master 996a809c5 -> 799e13161
[SPARK-21175] Reject OpenBlocks when memory shortage on shuffle service.
## What changes were proposed in this pull request?
A shuffle service can serves blocks from multiple apps/tasks. Thus the shuffle service can suffers high memory usage when lots of shuffle-reads happen at the same time. In my cluster, OOM always happens on shuffle service. Analyzing heap dump, memory cost by Netty(ChannelOutboundBufferEntry) can be up to 2~3G. It might make sense to reject "open blocks" request when memory usage is high on shuffle service.
https://github.com/apache/spark/commit/93dd0c518d040155b04e5ab258c5835aec7776fc and https://github.com/apache/spark/commit/85c6ce61930490e2247fb4b0e22dfebbb8b6a1ee tried to alleviate the memory pressure on shuffle service but cannot solve the root cause. This pr proposes to control currency of shuffle read.
## How was this patch tested?
Added unit test.
Author: jinxing <ji...@126.com>
Closes #18388 from jinxing64/SPARK-21175.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/799e1316
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/799e1316
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/799e1316
Branch: refs/heads/master
Commit: 799e13161e89f1ea96cb1bc7b507a05af2e89cd0
Parents: 996a809
Author: jinxing <ji...@126.com>
Authored: Tue Jul 25 20:52:07 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 25 20:52:07 2017 +0800
----------------------------------------------------------------------
.../apache/spark/network/TransportContext.java | 2 +-
.../network/server/OneForOneStreamManager.java | 60 ++++++++-
.../spark/network/server/StreamManager.java | 27 ++++
.../network/server/TransportRequestHandler.java | 42 +++++-
.../spark/network/util/TransportConf.java | 6 +
.../network/TransportRequestHandlerSuite.java | 134 +++++++++++++++++++
docs/configuration.md | 7 +
7 files changed, 265 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 965c4ae..ae91bc9 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -168,7 +168,7 @@ public class TransportContext {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
- rpcHandler);
+ rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index 85ca2f1..0f6a882 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +55,9 @@ public class OneForOneStreamManager extends StreamManager {
// that the caller only requests each chunk one at a time, in order.
int curChunk = 0;
+ // Used to keep track of the number of chunks being transferred and not finished yet.
+ volatile long chunksBeingTransferred = 0L;
+
StreamState(String appId, Iterator<ManagedBuffer> buffers) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
@@ -96,18 +101,25 @@ public class OneForOneStreamManager extends StreamManager {
@Override
public ManagedBuffer openStream(String streamChunkId) {
- String[] array = streamChunkId.split("_");
- assert array.length == 2:
- "Stream id and chunk index should be specified when open stream for fetching block.";
- long streamId = Long.valueOf(array[0]);
- int chunkIndex = Integer.valueOf(array[1]);
- return getChunk(streamId, chunkIndex);
+ Pair<Long, Integer> streamChunkIdPair = parseStreamChunkId(streamChunkId);
+ return getChunk(streamChunkIdPair.getLeft(), streamChunkIdPair.getRight());
}
public static String genStreamChunkId(long streamId, int chunkId) {
return String.format("%d_%d", streamId, chunkId);
}
+ // Parse streamChunkId to be stream id and chunk id. This is used when fetch remote chunk as a
+ // stream.
+ public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
+ String[] array = streamChunkId.split("_");
+ assert array.length == 2:
+ "Stream id and chunk index should be specified.";
+ long streamId = Long.valueOf(array[0]);
+ int chunkIndex = Integer.valueOf(array[1]);
+ return ImmutablePair.of(streamId, chunkIndex);
+ }
+
@Override
public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
@@ -139,6 +151,42 @@ public class OneForOneStreamManager extends StreamManager {
}
}
+ @Override
+ public void chunkBeingSent(long streamId) {
+ StreamState streamState = streams.get(streamId);
+ if (streamState != null) {
+ streamState.chunksBeingTransferred++;
+ }
+
+ }
+
+ @Override
+ public void streamBeingSent(String streamId) {
+ chunkBeingSent(parseStreamChunkId(streamId).getLeft());
+ }
+
+ @Override
+ public void chunkSent(long streamId) {
+ StreamState streamState = streams.get(streamId);
+ if (streamState != null) {
+ streamState.chunksBeingTransferred--;
+ }
+ }
+
+ @Override
+ public void streamSent(String streamId) {
+ chunkSent(OneForOneStreamManager.parseStreamChunkId(streamId).getLeft());
+ }
+
+ @Override
+ public long chunksBeingTransferred() {
+ long sum = 0L;
+ for (StreamState streamState: streams.values()) {
+ sum += streamState.chunksBeingTransferred;
+ }
+ return sum;
+ }
+
/**
* Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
* callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java
index 07f161a..c535295 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java
@@ -83,4 +83,31 @@ public abstract class StreamManager {
*/
public void checkAuthorization(TransportClient client, long streamId) { }
+ /**
+ * Return the number of chunks being transferred and not finished yet in this StreamManager.
+ */
+ public long chunksBeingTransferred() {
+ return 0;
+ }
+
+ /**
+ * Called when start sending a chunk.
+ */
+ public void chunkBeingSent(long streamId) { }
+
+ /**
+ * Called when start sending a stream.
+ */
+ public void streamBeingSent(String streamId) { }
+
+ /**
+ * Called when a chunk is successfully sent.
+ */
+ public void chunkSent(long streamId) { }
+
+ /**
+ * Called when a stream is successfully sent.
+ */
+ public void streamSent(String streamId) { }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 8193bc1..e944535 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,14 +66,19 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** Returns each chunk part of a stream. */
private final StreamManager streamManager;
+ /** The max number of chunks being transferred and not finished yet. */
+ private final long maxChunksBeingTransferred;
+
public TransportRequestHandler(
Channel channel,
TransportClient reverseClient,
- RpcHandler rpcHandler) {
+ RpcHandler rpcHandler,
+ Long maxChunksBeingTransferred) {
this.channel = channel;
this.reverseClient = reverseClient;
this.rpcHandler = rpcHandler;
this.streamManager = rpcHandler.getStreamManager();
+ this.maxChunksBeingTransferred = maxChunksBeingTransferred;
}
@Override
@@ -117,7 +123,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
req.streamChunkId);
}
-
+ long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+ if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+ logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+ chunksBeingTransferred, maxChunksBeingTransferred);
+ channel.close();
+ return;
+ }
ManagedBuffer buf;
try {
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
@@ -130,10 +142,25 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
return;
}
- respond(new ChunkFetchSuccess(req.streamChunkId, buf));
+ streamManager.chunkBeingSent(req.streamChunkId.streamId);
+ respond(new ChunkFetchSuccess(req.streamChunkId, buf)).addListener(future -> {
+ streamManager.chunkSent(req.streamChunkId.streamId);
+ });
}
private void processStreamRequest(final StreamRequest req) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received req from {} to fetch stream {}", getRemoteAddress(channel),
+ req.streamId);
+ }
+
+ long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+ if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+ logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
+ chunksBeingTransferred, maxChunksBeingTransferred);
+ channel.close();
+ return;
+ }
ManagedBuffer buf;
try {
buf = streamManager.openStream(req.streamId);
@@ -145,7 +172,10 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
if (buf != null) {
- respond(new StreamResponse(req.streamId, buf.size(), buf));
+ streamManager.streamBeingSent(req.streamId);
+ respond(new StreamResponse(req.streamId, buf.size(), buf)).addListener(future -> {
+ streamManager.streamSent(req.streamId);
+ });
} else {
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
@@ -187,9 +217,9 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
- private void respond(Encodable result) {
+ private ChannelFuture respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
- channel.writeAndFlush(result).addListener(future -> {
+ return channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index a25078e..ea52e9f 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -257,4 +257,10 @@ public class TransportConf {
return CryptoUtils.toCryptoConf("spark.network.crypto.config.", conf.getAll());
}
+ /**
+ * The max number of chunks allowed to being transferred at the same time on shuffle service.
+ */
+ public long maxChunksBeingTransferred() {
+ return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
new file mode 100644
index 0000000..1fb987a
--- /dev/null
+++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.*;
+import org.apache.spark.network.server.NoOpRpcHandler;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+
+public class TransportRequestHandlerSuite {
+
+ @Test
+ public void handleFetchRequestAndStreamRequest() throws Exception {
+ RpcHandler rpcHandler = new NoOpRpcHandler();
+ OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager());
+ Channel channel = mock(Channel.class);
+ List<Pair<Object, ExtendedChannelPromise>> responseAndPromisePairs =
+ new ArrayList<>();
+ when(channel.writeAndFlush(any()))
+ .thenAnswer(invocationOnMock0 -> {
+ Object response = invocationOnMock0.getArguments()[0];
+ ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
+ responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
+ return channelFuture;
+ });
+
+ // Prepare the stream.
+ List<ManagedBuffer> managedBuffers = new ArrayList<>();
+ managedBuffers.add(new TestManagedBuffer(10));
+ managedBuffers.add(new TestManagedBuffer(20));
+ managedBuffers.add(new TestManagedBuffer(30));
+ managedBuffers.add(new TestManagedBuffer(40));
+ long streamId = streamManager.registerStream("test-app", managedBuffers.iterator());
+ streamManager.registerChannel(channel, streamId);
+ TransportClient reverseClient = mock(TransportClient.class);
+ TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
+ rpcHandler, 2L);
+
+ RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
+ requestHandler.handle(request0);
+ assert responseAndPromisePairs.size() == 1;
+ assert responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess;
+ assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body() ==
+ managedBuffers.get(0);
+
+ RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1));
+ requestHandler.handle(request1);
+ assert responseAndPromisePairs.size() == 2;
+ assert responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess;
+ assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body() ==
+ managedBuffers.get(1);
+
+ // Finish flushing the response for request0.
+ responseAndPromisePairs.get(0).getRight().finish(true);
+
+ RequestMessage request2 = new StreamRequest(String.format("%d_%d", streamId, 2));
+ requestHandler.handle(request2);
+ assert responseAndPromisePairs.size() == 3;
+ assert responseAndPromisePairs.get(2).getLeft() instanceof StreamResponse;
+ assert ((StreamResponse) (responseAndPromisePairs.get(2).getLeft())).body() ==
+ managedBuffers.get(2);
+
+ // Request3 will trigger the close of channel, because the number of max chunks being
+ // transferred is 2;
+ RequestMessage request3 = new StreamRequest(String.format("%d_%d", streamId, 3));
+ requestHandler.handle(request3);
+ verify(channel, times(1)).close();
+ assert responseAndPromisePairs.size() == 3;
+ }
+
+ private class ExtendedChannelPromise extends DefaultChannelPromise {
+
+ private List<GenericFutureListener> listeners = new ArrayList<>();
+ private boolean success;
+
+ public ExtendedChannelPromise(Channel channel) {
+ super(channel);
+ success = false;
+ }
+
+ @Override
+ public ChannelPromise addListener(
+ GenericFutureListener<? extends Future<? super Void>> listener) {
+ listeners.add(listener);
+ return super.addListener(listener);
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void finish(boolean success) {
+ this.success = success;
+ listeners.forEach(listener -> {
+ try {
+ listener.operationComplete(this);
+ } catch (Exception e) { }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/799e1316/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index d3df923..f4b6f46 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -632,6 +632,13 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.shuffle.maxChunksBeingTransferred</code></td>
+ <td>Long.MAX_VALUE</td>
+ <td>
+ The max number of chunks allowed to being transferred at the same time on shuffle service.
+ </td>
+</tr>
+<tr>
<td><code>spark.shuffle.sort.bypassMergeThreshold</code></td>
<td>200</td>
<td>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org