You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "waitinfuture (via GitHub)" <gi...@apache.org> on 2023/02/02 13:09:06 UTC

[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1200: [CELEBORN-267] reuse stream when client channel reconnected

waitinfuture commented on code in PR #1200:
URL: https://github.com/apache/incubator-celeborn/pull/1200#discussion_r1094502702


##########
common/src/main/java/org/apache/celeborn/common/network/server/ChunkStreamManager.java:
##########
@@ -41,14 +40,15 @@ public class ChunkStreamManager {
   private static final Logger logger = LoggerFactory.getLogger(ChunkStreamManager.class);
 
   private final AtomicLong nextStreamId;
+  // StreamId -> StreamState
   protected final ConcurrentHashMap<Long, StreamState> streams;
+  // ShuffleKey -> StreamId
+  protected final ConcurrentHashMap<String, Set<Long>> shuffleStreamIds;
 
   /** State of a single stream. */
   protected static class StreamState {
     final FileManagedBuffers buffers;
-
-    // The channel associated to the stream
-    final Channel associatedChannel;
+    final String shuffleKey;

Review Comment:
   nice catch!



##########
common/src/main/java/org/apache/celeborn/common/network/server/ChunkStreamManager.java:
##########
@@ -41,14 +40,15 @@ public class ChunkStreamManager {
   private static final Logger logger = LoggerFactory.getLogger(ChunkStreamManager.class);
 
   private final AtomicLong nextStreamId;
+  // StreamId -> StreamState
   protected final ConcurrentHashMap<Long, StreamState> streams;
+  // ShuffleKey -> StreamId
+  protected final ConcurrentHashMap<String, Set<Long>> shuffleStreamIds;

Review Comment:
   I think we need ConcurrentSet here



##########
common/src/main/java/org/apache/celeborn/common/network/server/ChunkStreamManager.java:
##########
@@ -154,18 +149,47 @@ public long chunksBeingTransferred() {
    * <p>If an app ID is provided, only callers who've authenticated with the given app ID will be
    * allowed to fetch from this stream.
    *
-   * <p>This method also associates the stream with a single client connection, which is guaranteed
-   * to be the only reader of the stream. Once the connection is closed, the stream will never be
-   * used again, enabling cleanup by `connectionTerminated`.
+   * <p>This stream could be reused again when other channel of the client is reconnected. If a
+   * stream is not properly closed, it will eventually be cleaned up by `cleanupExpiredShuffleKey`.
    */
-  public long registerStream(FileManagedBuffers buffers, Channel channel) {
+  public long registerStream(String shuffleKey, FileManagedBuffers buffers) {
     long myStreamId = nextStreamId.getAndIncrement();
-    streams.put(myStreamId, new StreamState(buffers, channel));
+    streams.put(myStreamId, new StreamState(shuffleKey, buffers));
+    shuffleStreamIds.compute(
+        shuffleKey,
+        (key, value) -> {
+          if (value == null) {
+            value = ConcurrentHashMap.newKeySet();
+          }
+          value.add(myStreamId);
+          return value;
+        });
+
     return myStreamId;
   }
 
+  public Set<String> shuffleKeySet() {
+    return shuffleStreamIds.keySet();
+  }
+
+  public void cleanupExpiredShuffleKey(Set<String> expiredShuffleKeys) {
+    for (String expiredShuffleKey : expiredShuffleKeys) {
+      Set<Long> removedStreamId = shuffleStreamIds.remove(expiredShuffleKey);

Review Comment:
   removedStreamId -> expiredStreams



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala:
##########
@@ -230,6 +230,7 @@ private[celeborn] class Worker(
     val estimatedAppDiskUsage = new JHashMap[String, JLong]()
     activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
     activeShuffleKeys.addAll(storageManager.shuffleKeySet())
+    activeShuffleKeys.addAll(fetchHandler.shuffleKeySet())

Review Comment:
   is this necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org