You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 03:56:13 UTC

[incubator-celeborn] branch main updated: [CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b052288 [CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)
2b052288 is described below

commit 2b05228871981b2fd0a249746797c333c6b168a6
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Nov 23 11:56:10 2022 +0800

    [CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)
---
 .../apache/celeborn/client/read/ChunkClient.java   | 11 ++++----
 .../network/server/OneForOneStreamManager.java     | 29 ++--------------------
 .../common/network/server/StreamManager.java       | 20 ---------------
 .../service/deploy/worker/FetchHandler.scala       |  3 +++
 4 files changed, 11 insertions(+), 52 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java b/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
index d9458c7b..0baa5e66 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
@@ -186,11 +186,10 @@ public class ChunkClient {
       client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
     } catch (Exception e) {
       logger.error(
-          "Exception raised while beginning fetch chunk {}{}.",
-          chunkIndex,
-          numTries > 0 ? " (after " + numTries + " retries)" : "",
+          "Exception raised while beginning fetch chunk "
+              + chunkIndex
+              + (numTries > 0 ? " (after " + numTries + " retries)" : ""),
           e);
-
       if (shouldRetry(e)) {
         initiateRetry(chunkIndex, callback.currentNumTries);
       } else {
@@ -255,7 +254,9 @@ public class ChunkClient {
       if (shouldRetry(e)) {
         initiateRetry(chunkIndex, this.currentNumTries);
       } else {
-        logger.error("Abandon to fetch chunk {} after {} tries.", chunkIndex, this.currentNumTries);
+        logger.error(
+            "Abandon to fetch chunk " + chunkIndex + " after " + this.currentNumTries + " tries.",
+            e);
         callback.onFailure(chunkIndex, ChunkClient.this.location, e);
       }
     }
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
index 838f7459..dec24f8e 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,33 +91,20 @@ public class OneForOneStreamManager extends StreamManager {
       // Normally, when all chunks are returned to the client, the stream should be removed here.
       // But if there is a switch on the client side, it will not go here at this time, so we need
       // to remove the stream when the connection is terminated, and release the unused buffer.
-      logger.trace("Removing stream id {}", streamId);
+      logger.debug("Removing stream id {}", streamId);
       streams.remove(streamId);
     }
 
     return nextChunk;
   }
 
-  public static String genStreamChunkId(long streamId, int chunkId) {
-    return String.format("%d_%d", streamId, chunkId);
-  }
-
-  // Parse streamChunkId to be stream id and chunk id. This is used when fetch remote chunk as a
-  // stream.
-  public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
-    String[] array = streamChunkId.split("_");
-    assert array.length == 2 : "Stream id and chunk index should be specified.";
-    long streamId = Long.parseLong(array[0]);
-    int chunkIndex = Integer.parseInt(array[1]);
-    return ImmutablePair.of(streamId, chunkIndex);
-  }
-
   @Override
   public void connectionTerminated(Channel channel) {
     // Close all streams which have been associated with the channel.
     for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
       StreamState state = entry.getValue();
       if (state.associatedChannel == channel) {
+        logger.debug("Remove stream id {} of channel {}", entry.getKey(), channel.remoteAddress());
         streams.remove(entry.getKey());
       }
     }
@@ -133,11 +118,6 @@ public class OneForOneStreamManager extends StreamManager {
     }
   }
 
-  @Override
-  public void streamBeingSent(String streamId) {
-    chunkBeingSent(parseStreamChunkId(streamId).getLeft());
-  }
-
   @Override
   public void chunkSent(long streamId) {
     StreamState streamState = streams.get(streamId);
@@ -146,11 +126,6 @@ public class OneForOneStreamManager extends StreamManager {
     }
   }
 
-  @Override
-  public void streamSent(String streamId) {
-    chunkSent(OneForOneStreamManager.parseStreamChunkId(streamId).getLeft());
-  }
-
   @Override
   public long chunksBeingTransferred() {
     long sum = 0L;
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
index b73f31c3..f48ebcc9 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
@@ -45,20 +45,6 @@ public abstract class StreamManager {
    */
   public abstract ManagedBuffer getChunk(long streamId, int chunkIndex, int offset, int len);
 
-  /**
-   * Called in response to a stream() request. The returned data is streamed to the client through a
-   * single TCP connection.
-   *
-   * <p>Note the <code>streamId</code> argument is not related to the similarly named argument in
-   * the {@link #getChunk(long, int, int, int)} method.
-   *
-   * @param streamId id of a stream that has been previously registered with the StreamManager.
-   * @return A managed buffer for the stream, or null if the stream was not found.
-   */
-  public ManagedBuffer openStream(String streamId) {
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
    * to read from the associated streams again, so any state can be cleaned up.
@@ -73,12 +59,6 @@ public abstract class StreamManager {
   /** Called when start sending a chunk. */
   public void chunkBeingSent(long streamId) {}
 
-  /** Called when start sending a stream. */
-  public void streamBeingSent(String streamId) {}
-
   /** Called when a chunk is successfully sent. */
   public void chunkSent(long streamId) {}
-
-  /** Called when a stream is successfully sent. */
-  public void streamSent(String streamId) {}
 }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 57eaae64..f587ae4e 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -104,6 +104,9 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
           if (fileInfo.numChunks() == 0) {
             logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
               s" $startMapIndex endMapIndex $endMapIndex is empty.")
+          } else {
+            logDebug(s"StreamId $streamId fileName $fileName numChunks ${fileInfo.numChunks()} " +
+              s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
           }
           client.getChannel.writeAndFlush(new RpcResponse(
             request.requestId,