You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 03:56:13 UTC
[incubator-celeborn] branch main updated: [CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2b052288 [CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)
2b052288 is described below
commit 2b05228871981b2fd0a249746797c333c6b168a6
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Nov 23 11:56:10 2022 +0800
[CELEBORN-47][IMPROVEMENT] Refine logs about tracking fetch chunk (#995)
---
.../apache/celeborn/client/read/ChunkClient.java | 11 ++++----
.../network/server/OneForOneStreamManager.java | 29 ++--------------------
.../common/network/server/StreamManager.java | 20 ---------------
.../service/deploy/worker/FetchHandler.scala | 3 +++
4 files changed, 11 insertions(+), 52 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java b/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
index d9458c7b..0baa5e66 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/ChunkClient.java
@@ -186,11 +186,10 @@ public class ChunkClient {
client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
} catch (Exception e) {
logger.error(
- "Exception raised while beginning fetch chunk {}{}.",
- chunkIndex,
- numTries > 0 ? " (after " + numTries + " retries)" : "",
+ "Exception raised while beginning fetch chunk "
+ + chunkIndex
+ + (numTries > 0 ? " (after " + numTries + " retries)" : ""),
e);
-
if (shouldRetry(e)) {
initiateRetry(chunkIndex, callback.currentNumTries);
} else {
@@ -255,7 +254,9 @@ public class ChunkClient {
if (shouldRetry(e)) {
initiateRetry(chunkIndex, this.currentNumTries);
} else {
- logger.error("Abandon to fetch chunk {} after {} tries.", chunkIndex, this.currentNumTries);
+ logger.error(
+ "Abandon to fetch chunk " + chunkIndex + " after " + this.currentNumTries + " tries.",
+ e);
callback.onFailure(chunkIndex, ChunkClient.this.location, e);
}
}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
index 838f7459..dec24f8e 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/OneForOneStreamManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,33 +91,20 @@ public class OneForOneStreamManager extends StreamManager {
// Normally, when all chunks are returned to the client, the stream should be removed here.
// But if there is a switch on the client side, it will not go here at this time, so we need
// to remove the stream when the connection is terminated, and release the unused buffer.
- logger.trace("Removing stream id {}", streamId);
+ logger.debug("Removing stream id {}", streamId);
streams.remove(streamId);
}
return nextChunk;
}
- public static String genStreamChunkId(long streamId, int chunkId) {
- return String.format("%d_%d", streamId, chunkId);
- }
-
- // Parse streamChunkId to be stream id and chunk id. This is used when fetch remote chunk as a
- // stream.
- public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
- String[] array = streamChunkId.split("_");
- assert array.length == 2 : "Stream id and chunk index should be specified.";
- long streamId = Long.parseLong(array[0]);
- int chunkIndex = Integer.parseInt(array[1]);
- return ImmutablePair.of(streamId, chunkIndex);
- }
-
@Override
public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
StreamState state = entry.getValue();
if (state.associatedChannel == channel) {
+ logger.debug("Remove stream id {} of channel {}", entry.getKey(), channel.remoteAddress());
streams.remove(entry.getKey());
}
}
@@ -133,11 +118,6 @@ public class OneForOneStreamManager extends StreamManager {
}
}
- @Override
- public void streamBeingSent(String streamId) {
- chunkBeingSent(parseStreamChunkId(streamId).getLeft());
- }
-
@Override
public void chunkSent(long streamId) {
StreamState streamState = streams.get(streamId);
@@ -146,11 +126,6 @@ public class OneForOneStreamManager extends StreamManager {
}
}
- @Override
- public void streamSent(String streamId) {
- chunkSent(OneForOneStreamManager.parseStreamChunkId(streamId).getLeft());
- }
-
@Override
public long chunksBeingTransferred() {
long sum = 0L;
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
index b73f31c3..f48ebcc9 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/StreamManager.java
@@ -45,20 +45,6 @@ public abstract class StreamManager {
*/
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex, int offset, int len);
- /**
- * Called in response to a stream() request. The returned data is streamed to the client through a
- * single TCP connection.
- *
- * <p>Note the <code>streamId</code> argument is not related to the similarly named argument in
- * the {@link #getChunk(long, int, int, int)} method.
- *
- * @param streamId id of a stream that has been previously registered with the StreamManager.
- * @return A managed buffer for the stream, or null if the stream was not found.
- */
- public ManagedBuffer openStream(String streamId) {
- throw new UnsupportedOperationException();
- }
-
/**
* Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
* to read from the associated streams again, so any state can be cleaned up.
@@ -73,12 +59,6 @@ public abstract class StreamManager {
/** Called when start sending a chunk. */
public void chunkBeingSent(long streamId) {}
- /** Called when start sending a stream. */
- public void streamBeingSent(String streamId) {}
-
/** Called when a chunk is successfully sent. */
public void chunkSent(long streamId) {}
-
- /** Called when a stream is successfully sent. */
- public void streamSent(String streamId) {}
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 57eaae64..f587ae4e 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -104,6 +104,9 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
if (fileInfo.numChunks() == 0) {
logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
s" $startMapIndex endMapIndex $endMapIndex is empty.")
+ } else {
+ logDebug(s"StreamId $streamId fileName $fileName numChunks ${fileInfo.numChunks()} " +
+ s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
}
client.getChannel.writeAndFlush(new RpcResponse(
request.requestId,