You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 12:23:25 UTC

[incubator-celeborn] branch branch-0.1 updated: 1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995 2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998 3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000 4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.1 by this push:
     new 2fb47857 1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995 2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998 3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000 4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999
2fb47857 is described below

commit 2fb478576c29f6b134ba388803c25c94154f6133
Author: Ethan Feng <et...@apache.org>
AuthorDate: Wed Nov 23 20:23:09 2022 +0800

    1.[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk #995
    2.[CELEBORN-49] Deadlock when kill worker in shuffle read #998
    3.[CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. #1000
    4.[CELEBORN-50][FOLLOWUP] Channel inactive may cause new client use old stream id to fetch data #999
---
 .../aliyun/emr/rss/client/read/ChunkClient.java    |  9 ++++---
 .../com/aliyun/emr/rss/client/read/Replica.java    | 30 +++++++++++++++++++---
 .../emr/rss/client/read/WorkerPartitionReader.java |  1 +
 .../network/server/OneForOneStreamManager.java     |  3 ++-
 .../rss/service/deploy/worker/FetchHandler.scala   |  3 +++
 5 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java b/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
index 7ef96315..7c1f5e27 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/ChunkClient.java
@@ -178,9 +178,9 @@ public class ChunkClient {
       client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
     } catch (Exception e) {
       logger.error(
-              "Exception raised while beginning fetch chunk {}{}.",
-              chunkIndex,
-              numTries > 0 ? " (after " + numTries + " retries)" : "",
+              "Exception raised while beginning fetch chunk "
+                      + chunkIndex
+                      + (numTries > 0 ? " (after " + numTries + " retries)" : ""),
               e);
 
       if (shouldRetry(e)) {
@@ -238,7 +238,8 @@ public class ChunkClient {
       if (shouldRetry(e)) {
         initiateRetry(chunkIndex, this.currentNumTries);
       } else {
-        logger.error("Abandon to fetch chunk {} after {} tries.", chunkIndex, this.currentNumTries);
+        logger.error(
+          "Abandon to fetch chunk " + chunkIndex + " after " + this.currentNumTries + " tries.", e);
         callback.onFailure(chunkIndex, ChunkClient.this.location, e);
       }
     }
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java b/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
index 60fb46ac..2fb1b0e6 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/Replica.java
@@ -28,8 +28,11 @@ import com.aliyun.emr.rss.common.network.protocol.Message;
 import com.aliyun.emr.rss.common.network.protocol.OpenStream;
 import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
 import com.aliyun.emr.rss.common.protocol.PartitionLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class Replica {
+  private Logger logger = LoggerFactory.getLogger(Replica.class);
   private final long timeoutMs;
   private final String shuffleKey;
   private final PartitionLocation location;
@@ -57,17 +60,36 @@ class Replica {
 
   public synchronized TransportClient getOrOpenStream() throws IOException, InterruptedException {
     if (client == null || !client.isActive()) {
+      if (client != null) {
+        logger.warn(
+                "Current channel from "
+                        + client.getChannel().localAddress()
+                        + " to "
+                        + client.getChannel().remoteAddress()
+                        + " for "
+                        + this
+                        + " is not active.");
+      }
       client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+      // When client is not active, the origin client's corresponding streamId may be removed
+      // by channel inactive. Replica should request a new StreamHandle for the new client again.
+      // Newly returned numChunks should be the same.
+      openStreamInternal();
     }
+    // For retried open stream if openStream rpc is failed.
     if (streamHandle == null) {
-      OpenStream openBlocks =
-          new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
-      ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
-      streamHandle = (StreamHandle) Message.decode(response);
+      openStreamInternal();
     }
     return client;
   }
 
+  private void openStreamInternal() {
+    OpenStream openBlocks =
+            new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
+    ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+    streamHandle = (StreamHandle) Message.decode(response);
+  }
+
   public long getStreamId() {
     return streamHandle.streamId;
   }
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java b/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
index 5cc28520..79b1bc9c 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/WorkerPartitionReader.java
@@ -96,6 +96,7 @@ public class WorkerPartitionReader implements PartitionReader {
                   currentChunkIndex = 0;
                   returnedChunks = 0;
                   numChunks = client.openChunks();
+                  fetchChunks();
                 }
               }
             } catch (IOException e1) {
diff --git a/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java b/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
index 4412e100..ac25d03a 100644
--- a/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
+++ b/common/src/main/java/com/aliyun/emr/rss/common/network/server/OneForOneStreamManager.java
@@ -91,7 +91,7 @@ public class OneForOneStreamManager extends StreamManager {
       // Normally, when all chunks are returned to the client, the stream should be removed here.
       // But if there is a switch on the client side, it will not go here at this time, so we need
       // to remove the stream when the connection is terminated, and release the unused buffer.
-      logger.trace("Removing stream id {}", streamId);
+      logger.debug("Remove stream id {}", streamId);
       streams.remove(streamId);
     }
 
@@ -119,6 +119,7 @@ public class OneForOneStreamManager extends StreamManager {
     for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
       StreamState state = entry.getValue();
       if (state.associatedChannel == channel) {
+        logger.debug("Remove stream id {} of channel {}", entry.getKey(), channel.remoteAddress());
         streams.remove(entry.getKey());
       }
     }
diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
index aeec80bf..ff100b8c 100644
--- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
+++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala
@@ -96,6 +96,9 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
         if (fileInfo.numChunks == 0) {
           logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
             s" $startMapIndex endMapIndex $endMapIndex is empty.")
+        } else {
+          logDebug(s"StreamId $streamId fileName $fileName numChunks ${fileInfo.numChunks} " +
+            s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
         }
         client.getChannel.writeAndFlush(new RpcResponse(request.requestId,
           new NioManagedBuffer(streamHandle.toByteBuffer)))