You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 10:08:02 UTC
[incubator-celeborn] branch main updated: [CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f1c4d675 [CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)
f1c4d675 is described below
commit f1c4d675d62eed9b9776167f76ffddfec08fe8d7
Author: Ethan Feng <et...@apache.org>
AuthorDate: Wed Nov 23 18:07:57 2022 +0800
[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)
---
.../org/apache/celeborn/client/read/Replica.java | 27 ++++++++++++++++++----
1 file changed, 23 insertions(+), 4 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/read/Replica.java b/client/src/main/java/org/apache/celeborn/client/read/Replica.java
index 40e60b88..5b8f1ed1 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/Replica.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/Replica.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
@@ -30,6 +32,7 @@ import org.apache.celeborn.common.network.protocol.StreamHandle;
import org.apache.celeborn.common.protocol.PartitionLocation;
class Replica {
+ private Logger logger = LoggerFactory.getLogger(Replica.class);
private final long timeoutMs;
private final String shuffleKey;
private final PartitionLocation location;
@@ -57,17 +60,33 @@ class Replica {
public synchronized TransportClient getOrOpenStream() throws IOException, InterruptedException {
if (client == null || !client.isActive()) {
+ if (client != null) {
+ logger.warn(
+ "Current channel from "
+ + client.getChannel().localAddress()
+ + " to "
+ + client.getChannel().remoteAddress()
+ + " for "
+ + this
+ + " is not active.");
+ }
client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+ openStreamInternal();
}
+ // For retried open stream if openStream rpc is failed.
if (streamHandle == null) {
- OpenStream openBlocks =
- new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
- ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
- streamHandle = (StreamHandle) Message.decode(response);
+ openStreamInternal();
}
return client;
}
+ private void openStreamInternal() {
+ OpenStream openBlocks =
+ new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
+ ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+ streamHandle = (StreamHandle) Message.decode(response);
+ }
+
public long getStreamId() {
return streamHandle.streamId;
}