You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/09 12:00:55 UTC
[incubator-celeborn] branch main updated: [CELEBORN-391][Flink] Refine register/release synchronization (#1321)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ec745e36d [CELEBORN-391][Flink] Refine register/release synchronization (#1321)
ec745e36d is described below
commit ec745e36d108fa90504d87f6b04c9226f8bb5ca2
Author: Shuang <lv...@gmail.com>
AuthorDate: Thu Mar 9 20:00:50 2023 +0800
[CELEBORN-391][Flink] Refine register/release synchronization (#1321)
---
.../common/network/server/BufferStreamManager.java | 27 ++++++++++++++--------
1 file changed, 17 insertions(+), 10 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
index ec90cfe12..2e4919ca4 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java
@@ -111,30 +111,31 @@ public class BufferStreamManager {
long streamId = nextStreamId.getAndIncrement();
streams.put(streamId, new StreamState(channel, fileInfo.getBufferSize()));
logger.debug("Register stream start streamId: {}, fileInfo: {}", streamId, fileInfo);
+ MapDataPartition mapDataPartition;
synchronized (activeMapPartitions) {
- MapDataPartition mapDataPartition = activeMapPartitions.get(fileInfo);
+ mapDataPartition = activeMapPartitions.get(fileInfo);
if (mapDataPartition == null) {
mapDataPartition = new MapDataPartition(fileInfo);
activeMapPartitions.put(fileInfo, mapDataPartition);
}
mapDataPartition.addStream(streamId);
- addCredit(initialCredit, streamId);
- servingStreams.put(streamId, mapDataPartition);
- // response streamId to channel first
- callback.accept(streamId);
- mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
}
+ addCredit(initialCredit, streamId);
+ servingStreams.put(streamId, mapDataPartition);
+ // response streamId to channel first
+ callback.accept(streamId);
+ mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
+
logger.debug("Register stream streamId: {}, fileInfo: {}", streamId, fileInfo);
return streamId;
}
- public void addCredit(int numCredit, long streamId) {
+ private void addCredit(MapDataPartition mapDataPartition, int numCredit, long streamId) {
logger.debug("streamId: {}, add credit: {}", streamId, numCredit);
try {
- MapDataPartition mapDataPartition = servingStreams.get(streamId);
- if (mapDataPartition != null) {
+ if (mapDataPartition != null && numCredit > 0) {
mapDataPartition.addReaderCredit(numCredit, streamId);
}
} catch (Throwable e) {
@@ -142,6 +143,11 @@ public class BufferStreamManager {
}
}
+ public void addCredit(int numCredit, long streamId) {
+ MapDataPartition mapDataPartition = servingStreams.get(streamId);
+ addCredit(mapDataPartition, numCredit, streamId);
+ }
+
public void connectionTerminated(Channel channel) {
for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
if (entry.getValue().getAssociatedChannel() == channel) {
@@ -191,7 +197,8 @@ public class BufferStreamManager {
if (streams.containsKey(streamId)) {
MapDataPartition mapDataPartition = servingStreams.get(streamId);
if (mapDataPartition != null) {
- if (mapDataPartition.releaseStream(streamId)) {
+ if (mapDataPartition.releaseStream(streamId)
+ && mapDataPartition.activeStreamIds.isEmpty()) {
synchronized (activeMapPartitions) {
if (mapDataPartition.activeStreamIds.isEmpty()) {
mapDataPartition.close();