You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/09 12:00:55 UTC

[incubator-celeborn] branch main updated: [CELEBORN-391][Flink] Refine register/release synchronization (#1321)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ec745e36d [CELEBORN-391][Flink] Refine register/release synchronization (#1321)
ec745e36d is described below

commit ec745e36d108fa90504d87f6b04c9226f8bb5ca2
Author: Shuang <lv...@gmail.com>
AuthorDate: Thu Mar 9 20:00:50 2023 +0800

    [CELEBORN-391][Flink] Refine register/release synchronization (#1321)
---
 .../common/network/server/BufferStreamManager.java | 27 ++++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
index ec90cfe12..2e4919ca4 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
@@ -111,30 +111,31 @@ public class BufferStreamManager {
     long streamId = nextStreamId.getAndIncrement();
     streams.put(streamId, new StreamState(channel, fileInfo.getBufferSize()));
     logger.debug("Register stream start streamId: {}, fileInfo: {}", streamId, fileInfo);
+    MapDataPartition mapDataPartition;
     synchronized (activeMapPartitions) {
-      MapDataPartition mapDataPartition = activeMapPartitions.get(fileInfo);
+      mapDataPartition = activeMapPartitions.get(fileInfo);
       if (mapDataPartition == null) {
         mapDataPartition = new MapDataPartition(fileInfo);
         activeMapPartitions.put(fileInfo, mapDataPartition);
       }
       mapDataPartition.addStream(streamId);
-      addCredit(initialCredit, streamId);
-      servingStreams.put(streamId, mapDataPartition);
-      // response streamId to channel first
-      callback.accept(streamId);
-      mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
     }
 
+    addCredit(initialCredit, streamId);
+    servingStreams.put(streamId, mapDataPartition);
+    // response streamId to channel first
+    callback.accept(streamId);
+    mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
+
     logger.debug("Register stream streamId: {}, fileInfo: {}", streamId, fileInfo);
 
     return streamId;
   }
 
-  public void addCredit(int numCredit, long streamId) {
+  private void addCredit(MapDataPartition mapDataPartition, int numCredit, long streamId) {
     logger.debug("streamId: {}, add credit: {}", streamId, numCredit);
     try {
-      MapDataPartition mapDataPartition = servingStreams.get(streamId);
-      if (mapDataPartition != null) {
+      if (mapDataPartition != null && numCredit > 0) {
         mapDataPartition.addReaderCredit(numCredit, streamId);
       }
     } catch (Throwable e) {
@@ -142,6 +143,11 @@ public class BufferStreamManager {
     }
   }
 
+  public void addCredit(int numCredit, long streamId) {
+    MapDataPartition mapDataPartition = servingStreams.get(streamId);
+    addCredit(mapDataPartition, numCredit, streamId);
+  }
+
   public void connectionTerminated(Channel channel) {
     for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
       if (entry.getValue().getAssociatedChannel() == channel) {
@@ -191,7 +197,8 @@ public class BufferStreamManager {
     if (streams.containsKey(streamId)) {
       MapDataPartition mapDataPartition = servingStreams.get(streamId);
       if (mapDataPartition != null) {
-        if (mapDataPartition.releaseStream(streamId)) {
+        if (mapDataPartition.releaseStream(streamId)
+            && mapDataPartition.activeStreamIds.isEmpty()) {
           synchronized (activeMapPartitions) {
             if (mapDataPartition.activeStreamIds.isEmpty()) {
               mapDataPartition.close();