You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 12:23:25 UTC
[incubator-celeborn] branch branch-0.1 updated: 1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995 2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998 3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000 4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.1 by this push:
new 2fb47857 1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995 2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998 3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000 4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999
2fb47857 is described below
commit 2fb478576c29f6b134ba388803c25c94154f6133
Author: Ethan Feng <et...@apache.org>
AuthorDate: Wed Nov 23 20:23:09 2022 +0800
1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995
2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998
3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000
4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999
---
.../aliyun/emr/rss/client/read/ChunkClient.java | 9 ++++---
.../com/aliyun/emr/rss/client/read/Replica.java | 30 +++++++++++++++++++---
.../emr/rss/client/read/WorkerPartitionReader.java | 1 +
.../network/server/OneForOneStreamManager.java | 3 ++-
.../rss/service/deploy/worker/FetchHandler.scala | 3 +++
5 files changed, 37 insertions(+), 9 deletions(-)
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java b/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
index 7ef96315..7c1f5e27 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
@@ -178,9 +178,9 @@ public class ChunkClient {
client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
} catch (Exception e) {
logger.error(
- "Exception raised while beginning fetch chunk {}{}.",
- chunkIndex,
- numTries > 0 ? " (after " + numTries + " retries)" : "",
+ "Exception raised while beginning fetch chunk "
+ + chunkIndex
+ + (numTries > 0 ? " (after " + numTries + " retries)" : ""),
e);
if (shouldRetry(e)) {
@@ -238,7 +238,8 @@ public class ChunkClient {
if (shouldRetry(e)) {
initiateRetry(chunkIndex, this.currentNumTries);
} else {
- logger.error("Abandon to fetch chunk {} after {} tries.", chunkIndex, this.currentNumTries);
+ logger.error(
+ "Abandon to fetch chunk " + chunkIndex + " after " + this.currentNumTries + " tries.", e);
callback.onFailure(chunkIndex, ChunkClient.this.location, e);
}
}
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java b/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
index 60fb46ac..2fb1b0e6 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
@@ -28,8 +28,11 @@ import com.aliyun.emr.rss.common.network.protocol.Message;
import com.aliyun.emr.rss.common.network.protocol.OpenStream;
import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class Replica {
+ private Logger logger = LoggerFactory.getLogger(Replica.class);
private final long timeoutMs;
private final String shuffleKey;
private final PartitionLocation location;
@@ -57,17 +60,36 @@ class Replica {
public synchronized TransportClient getOrOpenStream() throws IOException, InterruptedException {
if (client == null || !client.isActive()) {
+ if (client != null) {
+ logger.warn(
+ "Current channel from "
+ + client.getChannel().localAddress()
+ + " to "
+ + client.getChannel().remoteAddress()
+ + " for "
+ + this
+ + " is not active.");
+ }
client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+ // When client is not active, the origin client's corresponding streamId may be removed
+ // by channel inactive. Replica should request a new StreamHandle for the new client again.
+ // Newly returned numChunks should be the same.
+ openStreamInternal();
}
+ // For retried open stream if openStream rpc is failed.
if (streamHandle == null) {
- OpenStream openBlocks =
- new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
- ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
- streamHandle = (StreamHandle) Message.decode(response);
+ openStreamInternal();
}
return client;
}
+ private void openStreamInternal() {
+ OpenStream openBlocks =
+ new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
+ ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+ streamHandle = (StreamHandle) Message.decode(response);
+ }
+
public long getStreamId() {
return streamHandle.streamId;
}
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java b/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
index 5cc28520..79b1bc9c 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
@@ -96,6 +96,7 @@ public class WorkerPartitionReader implements PartitionReader {
currentChunkIndex = 0;
returnedChunks = 0;
numChunks = client.openChunks();
+ fetchChunks();
}
}
} catch (IOException e1) {
diff --git a/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java b/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
index 4412e100..ac25d03a 100644
--- a/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
+++ b/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
@@ -91,7 +91,7 @@ public class OneForOneStreamManager extends StreamManager {
// Normally, when all chunks are returned to the client, the stream should be removed here.
// But if there is a switch on the client side, it will not go here at this time, so we need
// to remove the stream when the connection is terminated, and release the unused buffer.
- logger.trace("Removing stream id {}", streamId);
+ logger.debug("Remove stream id {}", streamId);
streams.remove(streamId);
}
@@ -119,6 +119,7 @@ public class OneForOneStreamManager extends StreamManager {
for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
StreamState state = entry.getValue();
if (state.associatedChannel == channel) {
+ logger.debug("Remove stream id {} of channel {}", entry.getKey(), channel.remoteAddress());
streams.remove(entry.getKey());
}
}
diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
index aeec80bf..ff100b8c 100644
--- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
+++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
@@ -96,6 +96,9 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
if (fileInfo.numChunks == 0) {
logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
s" $startMapIndex endMapIndex $endMapIndex is empty.")
+ } else {
+ logDebug(s"StreamId $streamId fileName $fileName numChunks ${fileInfo.numChunks} " +
+ s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
}
client.getChannel.writeAndFlush(new RpcResponse(request.requestId,
new NioManagedBuffer(streamHandle.toByteBuffer)))