You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "FMX (via GitHub)" <gi...@apache.org> on 2023/04/03 09:42:51 UTC

[GitHub] [incubator-celeborn] FMX opened a new pull request, #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

FMX opened a new pull request, #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407

   ### What changes were proposed in this pull request?
   1. Support dynamic buffer allocation in reading.
   2. Do not fail flink task when channel is inactive.
   3. Add new metrics about buffers.
   4. Update grafana dashboard.
   
   
   ### Why are the changes needed?
   Ditto.
   
   
   ### Does this PR introduce _any_ user-facing change?
   NO.
   
   
   ### How was this patch tested?
   UT and cluster test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1160688629


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java:
##########
@@ -69,37 +71,27 @@ public class MemoryManager {
   private boolean underPressure;
   private final AtomicBoolean trimInProcess = new AtomicBoolean(false);
 
-  // For buffer stream
+  // For credit stream
   private final AtomicLong readBufferCounter = new AtomicLong(0);
   private long readBufferThreshold = 0;
-  private final ReadBufferDispatcher readBufferDispatcher;
+  private long readBufferTarget = 0;
+  private ReadBufferDispatcher readBufferDispatcher;
+  private List<ReadBufferTargetChangeListener> readBufferTargetChangeListeners;
+  private Object readBufferTargetChangeLock = new Object();

Review Comment:
   readBufferTargetChangeLock is unnecessary, just lock on readBufferTargetChangeListeners



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,51 +54,47 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);
     }
+    allocatedReadBuffers.decrement();
     memoryManager.changeReadBufferCounter(-1 * bufferSize);
   }
 
   @Override
   public void run() {
-    while (true) {
+    while (!stopFlag) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
-        while (buffers.size() < request.getMin()) {
+        while (buffers.size() < request.getNumber()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();
           } else {
             try {
               // If dispatcher can not allocate minimum buffers, it will wait here until necessary
               // buffers are get.
-              Thread.sleep(3);
+              Thread.sleep(this.readBufferAllocationWait);
             } catch (InterruptedException e) {
               logger.info("Buffer dispatcher is closing");
-              request.getBufferListener().notifyBuffers(null, e);
               return;

Review Comment:
   do not return here



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;

Review Comment:
   definedMinReadBuffers -> minReadBuffers, and other places



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;
+  private int definedMaxReadBuffers;
+  private int threadsPerMountPoint;
+  private int readAheadMin;

Review Comment:
   readAheadMin -> minBuffersToTriggerRead, and other places



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;
+  private int definedMaxReadBuffers;
+  private int threadsPerMountPoint;
+  private int readAheadMin;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers, int maxReadBuffers, int threadsPerMountpoint, int readAheadMin) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    definedMinReadBuffers = minReadBuffers;
+    definedMaxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.readAheadMin = readAheadMin;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        definedMinReadBuffers,
+        definedMaxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    StreamState streamState = new StreamState(channel, fileInfo.getBufferSize());
+    streams.put(streamId, streamState);
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    MapDataPartition mapDataPartition;
+    synchronized (activeMapPartitions) {
+      mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                definedMinReadBuffers,
+                definedMaxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                readAheadMin);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      mapDataPartition.addStream(streamId);
+    }
+
+    addCredit(initialCredit, streamId);
+    streamState.setMapDataPartition(mapDataPartition);
+    MemoryManager.instance().addReadBufferTargetChangeListener(mapDataPartition);

Review Comment:
   This will add mapDataPartition each time a stream is opened. Need to put it in the upper block where mapDataPartition is null.



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;
+  private int definedMaxReadBuffers;
+  private int threadsPerMountPoint;
+  private int readAheadMin;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers, int maxReadBuffers, int threadsPerMountpoint, int readAheadMin) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    definedMinReadBuffers = minReadBuffers;
+    definedMaxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.readAheadMin = readAheadMin;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        definedMinReadBuffers,
+        definedMaxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    StreamState streamState = new StreamState(channel, fileInfo.getBufferSize());
+    streams.put(streamId, streamState);
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    MapDataPartition mapDataPartition;
+    synchronized (activeMapPartitions) {
+      mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                definedMinReadBuffers,
+                definedMaxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                readAheadMin);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      mapDataPartition.addStream(streamId);
+    }
+
+    addCredit(initialCredit, streamId);
+    streamState.setMapDataPartition(mapDataPartition);
+    MemoryManager.instance().addReadBufferTargetChangeListener(mapDataPartition);
+    // response streamId to channel first
+    callback.accept(streamId);
+    mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
+
+    logger.debug("Register stream streamId: {}, fileInfo: {}", streamId, fileInfo);
+
+    return streamId;
+  }
+
+  private void addCredit(MapDataPartition mapDataPartition, int numCredit, long streamId) {
+    logger.debug("streamId: {}, add credit: {}", streamId, numCredit);
+    try {
+      if (mapDataPartition != null && numCredit > 0) {
+        mapDataPartition.addReaderCredit(numCredit, streamId);
+      }
+    } catch (Throwable e) {
+      logger.error("streamId: {}, add credit end: {}", streamId, numCredit);
+    }
+  }
+
+  public void addCredit(int numCredit, long streamId) {
+    MapDataPartition mapDataPartition = streams.get(streamId).getMapDataPartition();
+    addCredit(mapDataPartition, numCredit, streamId);
+  }
+
+  public void connectionTerminated(Channel channel) {
+    for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
+      if (entry.getValue().getAssociatedChannel() == channel) {
+        logger.info("connection closed, clean streamId: {}", entry.getKey());
+        recycleStream(entry.getKey());
+      }
+    }
+  }
+
+  public void notifyStreamEndByClient(long streamId) {
+    recycleStream(streamId);
+  }
+
+  public void recycleStream(long streamId) {
+    recycleStreamIds.add(new DelayedStreamId(streamId));
+    startRecycleThread(); // lazy start thread
+  }
+
+  @VisibleForTesting
+  public int numStreamStates() {
+    return streams.size();
+  }
+
+  @VisibleForTesting
+  public int numRecycleStreams() {
+    return recycleStreamIds.size();
+  }
+
+  @VisibleForTesting
+  public ConcurrentHashMap<Long, StreamState> getStreams() {
+    return streams;
+  }
+
+  private void startRecycleThread() {
+    if (recycleThread == null) {
+      synchronized (lock) {
+        if (recycleThread == null) {
+          recycleThread =
+              new Thread(
+                  () -> {
+                    while (true) {
+                      try {
+                        DelayedStreamId delayedStreamId = recycleStreamIds.take();
+                        cleanResource(delayedStreamId.streamId);
+                      } catch (Throwable e) {
+                        logger.warn(e.getMessage(), e);
+                      }
+                    }
+                  },
+                  "recycle-thread");
+          recycleThread.setDaemon(true);
+          recycleThread.start();
+
+          logger.info("start stream recycle thread");
+        }
+      }
+    }
+  }
+
+  public void cleanResource(Long streamId) {
+    logger.debug("received clean stream: {}", streamId);
+    if (streams.containsKey(streamId)) {
+      MapDataPartition mapDataPartition = streams.get(streamId).getMapDataPartition();
+      if (mapDataPartition != null) {
+        if (mapDataPartition.releaseStream(streamId)) {
+          StreamState state = streams.remove(streamId);
+          MemoryManager.instance()
+              .removeReadBufferTargetChangeListener(state.getMapDataPartition());

Review Comment:
   Should not remove MapPartition because there might be other streams. I think you got it wrong with MapDataPartition and stream reader here, please re-check.



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;
+  private int definedMaxReadBuffers;
+  private int threadsPerMountPoint;
+  private int readAheadMin;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers, int maxReadBuffers, int threadsPerMountpoint, int readAheadMin) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    definedMinReadBuffers = minReadBuffers;
+    definedMaxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.readAheadMin = readAheadMin;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        definedMinReadBuffers,
+        definedMaxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    StreamState streamState = new StreamState(channel, fileInfo.getBufferSize());
+    streams.put(streamId, streamState);
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    MapDataPartition mapDataPartition;
+    synchronized (activeMapPartitions) {
+      mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                definedMinReadBuffers,
+                definedMaxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                readAheadMin);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      mapDataPartition.addStream(streamId);
+    }
+
+    addCredit(initialCredit, streamId);
+    streamState.setMapDataPartition(mapDataPartition);

Review Comment:
   This line should precedes addCredit since addCredit will getMapPartition from streamState. Just put this line before mapDataPartition.addStream(streamId);



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,51 +54,47 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);
     }
+    allocatedReadBuffers.decrement();
     memoryManager.changeReadBufferCounter(-1 * bufferSize);
   }
 
   @Override
   public void run() {
-    while (true) {
+    while (!stopFlag) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
-        while (buffers.size() < request.getMin()) {
+        while (buffers.size() < request.getNumber()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();
           } else {
             try {
               // If dispatcher can not allocate minimum buffers, it will wait here until necessary

Review Comment:
   minimum buffers -> requested buffers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#issuecomment-1495973876

   The memory management of MapPartition reader is quite complex, could you open a CPIP describing the detailed design of it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161417951


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(

Review Comment:
   Reject. Buffer's size is variable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161420108


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+
+    applyNewBuffers();
+  }
+
+  private void applyNewBuffers() {

Review Comment:
   NO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1155935381


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,71 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);
 
       // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      if (allocateResources && bufferQueue.size() == 0) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {

Review Comment:
   the new apply buffers would be leaked here?



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import static org.apache.commons.crypto.utils.Utils.checkArgument;
+import static org.apache.hadoop.shaded.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import io.netty.buffer.ByteBuf;
+
+@NotThreadSafe

Review Comment:
   ThreadSafe?



##########
common/src/test/java/org/apache/celeborn/common/network/server/BufferStreamManagerSuiteJ.java:
##########
@@ -106,6 +106,8 @@ public void testStreamRegisterAndCleanup() throws Exception {
     numInFlightRequests.decrementAndGet();
     bufferStreamManager.connectionTerminated(channel);
     timeOutOrMeetCondition(() -> bufferStreamManager.numRecycleStreams() == 0);
+    // Let delayed queue element timeout
+    Thread.sleep(1000);

Review Comment:
   Can use timeOutOrMeetCondition to replace



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,71 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);

Review Comment:
   streamReaders can replace readers or readers need skip or remove when the reader is already released.



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -415,7 +467,7 @@ public boolean releaseStream(Long streamId) {
       DataPartitionReader dataPartitionReader = streamReaders.get(streamId);
       dataPartitionReader.release();
       if (dataPartitionReader.isFinished()) {
-        logger.debug("release all for stream: {}", streamId);
+        logger.info("release all for stream: {}", streamId);

Review Comment:
   keep debug level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1157218201


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -63,24 +69,26 @@ public void run() {
     while (true) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
         while (buffers.size() < request.getMin()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();
           } else {
             try {
               // If dispatcher can not allocate minimum buffers, it will wait here until necessary
               // buffers are get.
-              Thread.sleep(3);
+              Thread.sleep(1);

Review Comment:
   1ms seems too short, perhaps 50ms



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,118 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (firstReaderRegister.compareAndSet(true, false)) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {
+      logger.debug(
+          "try to apply new buffers {} {} {} {} {}",
+          isWaitingResources.get(),
+          bufferQueue.numBuffersOccupied(),
+          bufferQueue.size(),
+          readers.size(),
+          bufferQueue.numBuffersOccupied() + minReadBuffers);

Review Comment:
   Just print minReadBuffers since numBuffersOccupied is already printed



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -63,24 +69,26 @@ public void run() {
     while (true) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
         while (buffers.size() < request.getMin()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();

Review Comment:
   Will allocatedReadBuffers overflow? Since it keeps incrementing.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -34,27 +35,32 @@ public class ReadBufferDispatcher extends Thread {
   private final LinkedBlockingQueue<ReadBufferRequest> requests = new LinkedBlockingQueue<>();
   private final MemoryManager memoryManager;
   private final PooledByteBufAllocator readBufferAllocator;
+  private final LongAdder allocatedReadBuffers = new LongAdder();
 
   public ReadBufferDispatcher(MemoryManager memoryManager) {
     readBufferAllocator = NettyUtils.createPooledByteBufAllocator(true, true, 1);
     this.memoryManager = memoryManager;
     this.setName("Read-Buffer-Dispatcher");
+    this.setUncaughtExceptionHandler(
+        (t, e) -> {
+          logger.error("Buffer dispatcher thread failed", e);
+          readBufferAllocator.trimCurrentThreadCache();
+        });
     this.start();
   }
 
   public void addBufferRequest(ReadBufferRequest request) {
-    requests.add(request);
+    requests.offer(request);

Review Comment:
   Shall we check the return value? Since offer may return false.



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java:
##########
@@ -23,6 +23,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.exception.CelebornIOException;

Review Comment:
   It's strange to put the memory under network package, how about move it under common?cc @RexXiong 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1160599019


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,12 +58,11 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);

Review Comment:
   It's OK since it calls ```buf.release(refCnt);```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161420108


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+
+    applyNewBuffers();
+  }
+
+  private void applyNewBuffers() {

Review Comment:
   NO.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;

Review Comment:
   NO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161420043


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;

Review Comment:
   NO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156638352


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,71 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);
 
       // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      if (allocateResources && bufferQueue.size() == 0) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {

Review Comment:
   It won't. If map partition is release  the applied buffers will be release as well. If onBuffer method is called before map partition release, then the release will do the release process. Check the close method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156950248


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,125 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (firstReaderRegister.compareAndSet(true, false)) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {
+      logger.debug(
+          "try to apply new buffers {} {} {} {} {}",
+          isWaitingResources.get(),
+          bufferQueue.numBuffersOccupied(),
+          bufferQueue.size(),
+          readers.size(),
+          bufferQueue.numBuffersOccupied() + minReadBuffers);
+      if (bufferQueue.size() < minReadBuffers
+          && !readers.isEmpty()
+          && bufferQueue.numBuffersOccupied() + minReadBuffers <= maxReadBuffers
+          && isWaitingResources.compareAndSet(false, true)) {
+        logger.debug(
+            "apply new buffers while current buffer queue size {} with read count {} for "
+                + "map data partition {} with active stream id count {}",
+            bufferQueue.size(),
+            readers.size(),
+            this,
+            activeStreamIds.size());
+
+        memoryManager.requestReadBuffers(
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      }
     }
 
     public synchronized void readBuffers() {
       if (isReleased) {
-        // some read executor task may already be submitted to the threadpool
+        // some read executor task may already be submitted to the thread pool
         return;
       }
 
       try {
-        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers);
-        for (DataPartitionReader reader : readers) {
+        // make sure that all reader are open
+        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers.values());
+        for (DataPartitionReader reader : readers.values()) {
           reader.open(dataFileChanel, indexChannel);
         }
-        while (buffers != null && buffers.size() > 0 && !sortedReaders.isEmpty()) {
-          BufferRecycler bufferRecycler =
-              new BufferRecycler(memoryManager, (buffer) -> this.recycle(buffer, buffers));
+        while (bufferQueue.size() > 0 && !sortedReaders.isEmpty()) {
+          BufferRecycler bufferRecycler = new BufferRecycler(MapDataPartition.this::recycle);
           DataPartitionReader reader = sortedReaders.poll();
           try {
-            if (!reader.readAndSend(buffers, bufferRecycler)) {
-              readers.remove(reader);
+            if (!reader.readAndSend(bufferQueue, bufferRecycler)) {
+              // this reader has finished, recycle stream id
+              if (reader.isFinished()) {
+                reader.recycle();
+              }
             }
           } catch (Throwable e) {
             logger.error("reader exception, reader: {}, message: {}", reader, e.getMessage(), e);
-            readers.remove(reader);
+            // this reader failed , recycle stream id
             reader.recycleOnError(e);
           }
         }
       } catch (Throwable e) {
         logger.error("Fatal: failed to read partition data. {}", e.getMessage(), e);
-        for (DataPartitionReader reader : readers) {
+        for (DataPartitionReader reader : readers.values()) {
           reader.recycleOnError(e);
         }
-
-        readers.clear();
+        // this map data partition failed, recycle all readers
+        readers.values().forEach(reader -> reader.recycle());

Review Comment:
   just remove this. readers already recycle at line 432



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,125 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (firstReaderRegister.compareAndSet(true, false)) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {
+      logger.debug(
+          "try to apply new buffers {} {} {} {} {}",
+          isWaitingResources.get(),
+          bufferQueue.numBuffersOccupied(),
+          bufferQueue.size(),
+          readers.size(),
+          bufferQueue.numBuffersOccupied() + minReadBuffers);
+      if (bufferQueue.size() < minReadBuffers
+          && !readers.isEmpty()
+          && bufferQueue.numBuffersOccupied() + minReadBuffers <= maxReadBuffers
+          && isWaitingResources.compareAndSet(false, true)) {
+        logger.debug(
+            "apply new buffers while current buffer queue size {} with read count {} for "
+                + "map data partition {} with active stream id count {}",
+            bufferQueue.size(),
+            readers.size(),
+            this,
+            activeStreamIds.size());
+
+        memoryManager.requestReadBuffers(
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      }
     }
 
     public synchronized void readBuffers() {
       if (isReleased) {
-        // some read executor task may already be submitted to the threadpool
+        // some read executor task may already be submitted to the thread pool
         return;
       }
 
       try {
-        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers);
-        for (DataPartitionReader reader : readers) {
+        // make sure that all reader are open
+        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers.values());
+        for (DataPartitionReader reader : readers.values()) {
           reader.open(dataFileChanel, indexChannel);
         }
-        while (buffers != null && buffers.size() > 0 && !sortedReaders.isEmpty()) {
-          BufferRecycler bufferRecycler =
-              new BufferRecycler(memoryManager, (buffer) -> this.recycle(buffer, buffers));
+        while (bufferQueue.size() > 0 && !sortedReaders.isEmpty()) {
+          BufferRecycler bufferRecycler = new BufferRecycler(MapDataPartition.this::recycle);
           DataPartitionReader reader = sortedReaders.poll();
           try {
-            if (!reader.readAndSend(buffers, bufferRecycler)) {
-              readers.remove(reader);
+            if (!reader.readAndSend(bufferQueue, bufferRecycler)) {
+              // this reader has finished, recycle stream id
+              if (reader.isFinished()) {

Review Comment:
   we can remove this as reader finished will close the reader self.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1159213820


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -63,24 +69,26 @@ public void run() {
     while (true) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
         while (buffers.size() < request.getMin()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();

Review Comment:
   It will decrement when a buffer is recycled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161554794


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   IMO we should not change isClosed here, readData already checks and set it.
   ```
       if (!hasRemaining) {
         closeReader();
       }
   ```



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   And in Line176, I think it's not necessary to check isClosed, because addBuffer is only called when there are remaining data to read. Also, I think check isError is unnecessary, because when isError is set true, it indicates that isReleased is also true. I think the code can be modified as follows
   ``` 
   private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
       if (buffer == null) {
         return;
       }
       synchronized (lock) {
         if (!isReleased) {
           buffersToSend.add(new RecyclableBuffer(buffer, bufferRecycler));
         } else {
           bufferRecycler.recycle(buffer);
           throw new RuntimeException("Partition reader has been failed or finished.", errorCause);
         }
       }
     }
   
   ```



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   isClosed is not precise, the true meaning of it is that the needed data for this reader is all read out, but not necessarily sent. So I think it's better to rename to addDataRead.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   Also should rename isError to errorNotified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1160601889


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java:
##########
@@ -23,6 +23,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.exception.CelebornIOException;

Review Comment:
   Ok, let's do it there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156645584


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,71 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);

Review Comment:
   I think this change can be done in a refactor pull request not this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1159295030


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import static org.apache.commons.crypto.utils.Utils.checkArgument;
+import static org.apache.hadoop.shaded.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+
+public class BufferQueue {
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  public BufferQueue() {}
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Adds a collection of available buffers to this buffer queue and will throw exception if this
+   * buffer queue has been released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    checkArgument(availableBuffers != null, "Must be not null.");
+    checkState(!isReleased, "Buffer queue has been released.");
+
+    buffers.addAll(availableBuffers);
+    numBuffersOccupied.addAndGet(availableBuffers.size());
+  }
+
+  public int numBuffersOccupied() {
+    return numBuffersOccupied.get();
+  }
+
+  public void recycleAll() {

Review Comment:
   I think we'd better not define this method since it's only called in release(), which is a synchronized method. IMO recycleAll should not be called elsewhere. Just put the method body into release().



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,12 +58,11 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);

Review Comment:
   Can it guarantee that after release here, the refCnt will become zero? If not, seems there will be memory leak.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -34,11 +35,19 @@ public class ReadBufferDispatcher extends Thread {
   private final LinkedBlockingQueue<ReadBufferRequest> requests = new LinkedBlockingQueue<>();
   private final MemoryManager memoryManager;
   private final PooledByteBufAllocator readBufferAllocator;
+  private final LongAdder allocatedReadBuffers = new LongAdder();
+  private final long readBufferAllocationWait;
 
-  public ReadBufferDispatcher(MemoryManager memoryManager) {
+  public ReadBufferDispatcher(MemoryManager memoryManager, long readBufferAllocationWait) {
+    this.readBufferAllocationWait = readBufferAllocationWait;
     readBufferAllocator = NettyUtils.createPooledByteBufAllocator(true, true, 1);
     this.memoryManager = memoryManager;
     this.setName("Read-Buffer-Dispatcher");
+    this.setUncaughtExceptionHandler(

Review Comment:
   In which situation will there be uncaught Exception? If so, I think we need to revive from it, or the dispatcher will not work.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java:
##########
@@ -360,8 +363,8 @@ public long getPausePushDataCounter() {
     return pausePushDataCounter.sum();
   }
 
-  public void requestReadBuffers(int min, int max, int bufferSize, ReadBufferListener listener) {
-    readBufferDispatcher.addBufferRequest(new ReadBufferRequest(min, max, bufferSize, listener));
+  public void requestReadBuffers(ReadBufferRequest request) {

Review Comment:
   In Line358, IMO better to return readBufferCounter.get() instead of readBufferCounter to avoid exposing readBufferCounter to other places.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferRequest.java:
##########
@@ -25,7 +25,8 @@ public class ReadBufferRequest {
   private final int bufferSize;
   private final ReadBufferListener readBufferListener;
 
-  ReadBufferRequest(int min, int max, int bufferSize, ReadBufferListener readBufferListener) {
+  public ReadBufferRequest(

Review Comment:
   Better to rename min/max to minBuffers/maxBuffers in this class.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -63,24 +71,26 @@ public void run() {
     while (true) {

Review Comment:
   I think it's better to use a stop flag instead of using while (true), and we should add a close() method to set the flag to true in Worker's close()



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,118 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (!bufferQueueInitialized) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,

Review Comment:
   In the default config, minReadBuffers is 16m and maxReadBuffers is 32m, which is bad for cases where the file size is smaller than 16m or 32m. IMO we need to add a fileSize in FileInfo, and ensure minBuffers and maxBuffers not exceed the fileSize.



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,118 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (!bufferQueueInitialized) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+        bufferQueueInitialized = true;
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {
+      logger.debug(
+          "try to apply new buffers {} {} {} {}",
+          isWaitingResources.get(),
+          bufferQueue.numBuffersOccupied(),
+          bufferQueue.size(),
+          readers.size());
+      if (bufferQueue.size() < minReadBuffers
+          && !readers.isEmpty()
+          && bufferQueue.numBuffersOccupied() + minReadBuffers <= maxReadBuffers
+          && isWaitingResources.compareAndSet(false, true)) {
+        logger.debug(
+            "apply new buffers while current buffer queue size {} with read count {} for "
+                + "map data partition {} with active stream id count {}",
+            bufferQueue.size(),
+            readers.size(),
+            this,
+            activeStreamIds.size());
+
+        memoryManager.requestReadBuffers(
+            new ReadBufferRequest(

Review Comment:
   ditto



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -426,21 +478,19 @@ public boolean releaseStream(Long streamId) {
     }
 
     public void close() {
-      logger.info("release map data partition {}", fileInfo);
+      logger.debug("release map data partition {}", fileInfo);
 
       IOUtils.closeQuietly(dataFileChanel);
       IOUtils.closeQuietly(indexChannel);
-
-      if (this.buffers != null) {
-        for (ByteBuf buffer : this.buffers) {
-          memoryManager.recycleReadBuffer(buffer);
-        }
-      }
-
-      this.buffers = null;
+      bufferQueue.release();
 
       isReleased = true;
     }
+
+    @Override
+    public String toString() {
+      return "MapDataPartition{" + "fileInfo=" + fileInfo.getFilePath() + '}';
+    }
   }
 
   class StorageFetcherPool {

Review Comment:
   I think we don't need the class. Just put executorPools and getExecutorPool() to BufferStreamManager



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -63,24 +71,26 @@ public void run() {
     while (true) {
       ReadBufferRequest request = null;
       try {
-        request = requests.poll(500, TimeUnit.MILLISECONDS);
+        request = requests.poll(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         logger.info("Buffer dispatcher is closing");
       }
 
       if (request != null) {
+        long start = System.nanoTime();
         List<ByteBuf> buffers = new ArrayList<>();
         int bufferSize = request.getBufferSize();
         while (buffers.size() < request.getMin()) {
           if (memoryManager.readBufferAvailable(bufferSize)) {
             memoryManager.changeReadBufferCounter(bufferSize);
             ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
             buffers.add(buf);
+            allocatedReadBuffers.increment();
           } else {
             try {
               // If dispatcher can not allocate minimum buffers, it will wait here until necessary
               // buffers are get.
-              Thread.sleep(3);
+              Thread.sleep(this.readBufferAllocationWait);
             } catch (InterruptedException e) {
               logger.info("Buffer dispatcher is closing");
               request.getBufferListener().notifyBuffers(null, e);

Review Comment:
   I don't think we should return when interrupted. I also wonder when it will be interrupted, better just log and ignore the exception, and let the stop flag to control when to exit.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/WrappedDataBuffer.java:
##########
@@ -23,18 +23,14 @@ public class WrappedDataBuffer {
 
   public final ByteBuf byteBuf;
 
-  public final Recycler bufferRecycler;
+  public final BufferRecycler bufferRecycler;
 
-  public WrappedDataBuffer(ByteBuf byteBuf, Recycler bufferRecycler) {
+  public WrappedDataBuffer(ByteBuf byteBuf, BufferRecycler bufferRecycler) {

Review Comment:
   Better to rename this class to RecyclableBuffer



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -236,6 +232,18 @@ public void cleanResource(Long streamId) {
     }
   }
 
+  public long getStreamsCount() {
+    return streams.size();
+  }
+
+  public long getActiveServingStreamCount() {
+    return servingStreams.size();

Review Comment:
   We should put MapDataPartition into StreamState and remove servingStreams



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156703581


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,79 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);
 
       // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      if (allocateResources && bufferQueue.size() == 0) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.compareAndSet(true, false);

Review Comment:
   set to false directly, better to move this before add buffers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#issuecomment-1494022931

   ## [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1407](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3a50ffc) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/2c3005ad5bbdcbe09a995f6ef62834ac8672c882?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2c3005a) will **increase** coverage by `15.10%`.
   > The diff coverage is `39.56%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##               main    #1407       +/-   ##
   =============================================
   + Coverage     29.89%   44.99%   +15.10%     
   =============================================
     Files           223      165       -58     
     Lines         18679    10485     -8194     
     Branches       1989     1064      -925     
   =============================================
   - Hits           5583     4717      -866     
   + Misses        12705     5428     -7277     
   + Partials        391      340       -51     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ommon/network/server/memory/WrappedDataBuffer.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L1dyYXBwZWREYXRhQnVmZmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...orn/common/network/server/DataPartitionReader.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvRGF0YVBhcnRpdGlvblJlYWRlci5qYXZh) | `23.92% <4.00%> (-0.19%)` | :arrow_down: |
   | [...orn/common/network/server/BufferStreamManager.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvQnVmZmVyU3RyZWFtTWFuYWdlci5qYXZh) | `69.48% <38.60%> (-7.04%)` | :arrow_down: |
   | [...rn/common/network/server/memory/MemoryManager.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L01lbW9yeU1hbmFnZXIuamF2YQ==) | `50.29% <50.00%> (+4.36%)` | :arrow_up: |
   | [...born/common/network/server/memory/BufferQueue.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L0J1ZmZlclF1ZXVlLmphdmE=) | `54.84% <54.84%> (ø)` | |
   | [...on/network/server/memory/ReadBufferDispatcher.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L1JlYWRCdWZmZXJEaXNwYXRjaGVyLmphdmE=) | `60.66% <60.00%> (+3.52%)` | :arrow_up: |
   | [...n/common/network/server/memory/BufferRecycler.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L0J1ZmZlclJlY3ljbGVyLmphdmE=) | `60.00% <100.00%> (+10.00%)` | :arrow_up: |
   | [...ommon/network/server/memory/ReadBufferRequest.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L1JlYWRCdWZmZXJSZXF1ZXN0LmphdmE=) | `91.67% <100.00%> (ø)` | |
   | [...cala/org/apache/celeborn/common/CelebornConf.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1407?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL0NlbGVib3JuQ29uZi5zY2FsYQ==) | `86.29% <100.00%> (+4.69%)` | :arrow_up: |
   
   ... and [124 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-celeborn/pull/1407/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161591266


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   Maybe renaming isClosed to readFinished will be more intuitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1162589822


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Adds a collection of available buffers to this buffer queue and will throw exception if this
+   * buffer queue has been released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    if (!isReleased) {
+      buffers.addAll(availableBuffers);
+      numBuffersOccupied.addAndGet(availableBuffers.size());
+      pendingRequestBuffers.addAndGet(-1 * availableBuffers.size());
+    } else {
+      for (ByteBuf availableBuffer : availableBuffers) {
+        memoryManager.recycleReadBuffer(availableBuffer);
+      }
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased || numBuffersOccupied.get() > localBuffersTarget) {
+      recycleToGlobalPool(buffer);
+    } else {
+      recycleToLocalPool(buffer);
+    }
+  }
+
+  public synchronized void recycleToGlobalPool(ByteBuf buffer) {
+    numBuffersOccupied.decrementAndGet();
+    memoryManager.recycleReadBuffer(buffer);
+  }
+
+  public void recycleToLocalPool(ByteBuf buffer) {
+    buffer.clear();
+    buffer.retain();
+    buffers.add(buffer);
+  }
+
+  // free unused buffer to the main pool if possible
+  public void trim() {
+    while (numBuffersOccupied.get() > localBuffersTarget) {
+      ByteBuf buffer = poll();
+      if (buffer != null) {
+        recycleToGlobalPool(buffer);

Review Comment:
   Better to move ```recycleToGlobalPool``` out of the while loop, because ```recycleToGlobalPool``` is synchronized, and we need to make its argument to be list of ByteBuf.



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Adds a collection of available buffers to this buffer queue and will throw exception if this

Review Comment:
   comment is not precise, will not throw exception now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1163489730


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Add buffers and increment numBufferOccupied. Free all buffers to global memory pool if this
+   * buffer queue is released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    if (!isReleased) {
+      buffers.addAll(availableBuffers);
+      numBuffersOccupied.addAndGet(availableBuffers.size());
+      pendingRequestBuffers.addAndGet(-1 * availableBuffers.size());
+    } else {
+      for (ByteBuf availableBuffer : availableBuffers) {
+        memoryManager.recycleReadBuffer(availableBuffer);
+      }
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased || numBuffersOccupied.get() > localBuffersTarget) {
+      recycleToGlobalPool(buffer);
+    } else {
+      recycleToLocalPool(buffer);
+    }
+  }
+
+  public synchronized void recycleToGlobalPool(ByteBuf buffer) {
+    numBuffersOccupied.decrementAndGet();
+    memoryManager.recycleReadBuffer(buffer);
+  }
+
+  public synchronized void recycleToLocalPool(ByteBuf buffer) {

Review Comment:
   IMO it is inefficient to synchronize ```recycleToLocalPool``` and ```recycleToGlobalPool```, especially ```recycleToLocalPool```, because it's high-frequent operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161538034


##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -46,6 +46,7 @@ public class FileInfo {
   // members for MapPartition
   private int bufferSize;
   private int numSubpartitions;
+  private long fileSize;

Review Comment:
   Yes, it should. Forgot about worker's graceful shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1162781752


##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int threadsPerMountPoint;
+  private int minBuffersToTriggerRead;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers,
+      int maxReadBuffers,
+      int threadsPerMountpoint,
+      int minBuffersToTriggerRead) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        this.minReadBuffers,
+        this.maxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    synchronized (activeMapPartitions) {
+      MapDataPartition mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                minReadBuffers,
+                maxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                minBuffersToTriggerRead);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      StreamState streamState =
+          new StreamState(channel, fileInfo.getBufferSize(), mapDataPartition);
+      streams.put(streamId, streamState);
+      mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
+    }
+
+    addCredit(initialCredit, streamId);
+
+    callback.accept(streamId);

Review Comment:
   callback.accept(streamId) must precedes addCredit(initialCredit, streamId), or the worker might send ReadData before StreamHandle if initialCredit is more than 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156955101


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,125 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (firstReaderRegister.compareAndSet(true, false)) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {
+      logger.debug(
+          "try to apply new buffers {} {} {} {} {}",
+          isWaitingResources.get(),
+          bufferQueue.numBuffersOccupied(),
+          bufferQueue.size(),
+          readers.size(),
+          bufferQueue.numBuffersOccupied() + minReadBuffers);
+      if (bufferQueue.size() < minReadBuffers
+          && !readers.isEmpty()
+          && bufferQueue.numBuffersOccupied() + minReadBuffers <= maxReadBuffers
+          && isWaitingResources.compareAndSet(false, true)) {
+        logger.debug(
+            "apply new buffers while current buffer queue size {} with read count {} for "
+                + "map data partition {} with active stream id count {}",
+            bufferQueue.size(),
+            readers.size(),
+            this,
+            activeStreamIds.size());
+
+        memoryManager.requestReadBuffers(
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      }
     }
 
     public synchronized void readBuffers() {
       if (isReleased) {
-        // some read executor task may already be submitted to the threadpool
+        // some read executor task may already be submitted to the thread pool
         return;
       }
 
       try {
-        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers);
-        for (DataPartitionReader reader : readers) {
+        // make sure that all reader are open
+        PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers.values());
+        for (DataPartitionReader reader : readers.values()) {
           reader.open(dataFileChanel, indexChannel);
         }
-        while (buffers != null && buffers.size() > 0 && !sortedReaders.isEmpty()) {
-          BufferRecycler bufferRecycler =
-              new BufferRecycler(memoryManager, (buffer) -> this.recycle(buffer, buffers));
+        while (bufferQueue.size() > 0 && !sortedReaders.isEmpty()) {
+          BufferRecycler bufferRecycler = new BufferRecycler(MapDataPartition.this::recycle);
           DataPartitionReader reader = sortedReaders.poll();
           try {
-            if (!reader.readAndSend(buffers, bufferRecycler)) {
-              readers.remove(reader);
+            if (!reader.readAndSend(bufferQueue, bufferRecycler)) {
+              // this reader has finished, recycle stream id
+              if (reader.isFinished()) {

Review Comment:
   We cannot remove this because when readAndSend returns false do not mean that the data has been sent. It only means that data is in memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1159448930


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -52,8 +44,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.meta.FileInfo;
-import org.apache.celeborn.common.network.server.memory.BufferRecycler;
-import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.*;
 import org.apache.celeborn.common.util.JavaUtils;
 
 public class BufferStreamManager {

Review Comment:
   Should make the declare type of recycleStreamIds to DelayQueue<DelayedStreamId> instead of BlockingQueue<DelayedStreamId> to make it more clear.



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,118 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (!bufferQueueInitialized) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+        bufferQueueInitialized = true;
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.set(false);
+      logger.debug(
+          "MapDataPartition set isWaitingResources to {} by on buffer", isWaitingResources.get());
     }
 
-    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
-      buffer.clear();
-      bufferQueue.add(buffer);
-      triggerRead();
+    public void recycle(ByteBuf buffer) {
+      bufferQueue.recycle(buffer);
+      if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+        return;
+      }
+
+      if (bufferQueue.size() > 0) {
+        triggerRead();
+      }
+
+      applyNewBuffers();
+    }
+
+    private void applyNewBuffers() {

Review Comment:
   IMO this method is not logically correct. This method should do the following:
   check whether bufferQueue.numBuffersOccupied is less than minReadBuffers, if so, request (min - numBuffersOccupied, max - numBuffersOccupied) buffers.
   And this method should be checked by a thread, instead called in recycle, because recycle is multi-threaded.



##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -283,18 +291,18 @@ public String toString() {
   protected class MapDataPartition {

Review Comment:
   We should put MapDataPartition into an individual file



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,12 +58,11 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);

Review Comment:
   In your comment ```If a reader failed, related read buffers will have more than one reference count```, it means the ref count will be more than 1, and if that happens, call release once will not make the refCnt be 0, and thus cause memory leak.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1162846012


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private long indexSize;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int fileBuffers;
+  private int minBuffersToTriggerRead;
+
+  public MapDataPartition(
+      int minReadBuffers,
+      int maxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int minBuffersToTriggerRead)
+      throws IOException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    this.fileBuffers = (int) Math.ceil(fileInfo.getFileSize() * 1.0 / fileInfo.getBufferSize());
+
+    updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        bufferQueue.getLocalBuffersTarget(),
+        fileInfo.getBufferSize());
+
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+    this.indexSize = indexChannel.size();
+
+    MemoryManager.instance().addReadBufferTargetChangeListener(this);
+  }
+
+  private synchronized void updateBuffersTarget(int buffersTarget) {
+    int currentBuffersTarget = buffersTarget;
+    if (currentBuffersTarget < minReadBuffers) {
+      currentBuffersTarget = minReadBuffers;
+    }
+    if (currentBuffersTarget > maxReadBuffers) {
+      currentBuffersTarget = maxReadBuffers;
+    }
+    if (currentBuffersTarget > fileBuffers) {
+      currentBuffersTarget = fileBuffers;
+    }
+    bufferQueue.setLocalBuffersTarget(currentBuffersTarget);
+  }
+
+  public void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(

Review Comment:
   every time request new buffers,you need incr pending buffers at the same time



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private long indexSize;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int fileBuffers;
+  private int minBuffersToTriggerRead;
+
+  public MapDataPartition(
+      int minReadBuffers,
+      int maxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int minBuffersToTriggerRead)
+      throws IOException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    this.fileBuffers = (int) Math.ceil(fileInfo.getFileSize() * 1.0 / fileInfo.getBufferSize());
+
+    updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        bufferQueue.getLocalBuffersTarget(),
+        fileInfo.getBufferSize());
+
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+    this.indexSize = indexChannel.size();
+
+    MemoryManager.instance().addReadBufferTargetChangeListener(this);
+  }
+
+  private synchronized void updateBuffersTarget(int buffersTarget) {
+    int currentBuffersTarget = buffersTarget;
+    if (currentBuffersTarget < minReadBuffers) {
+      currentBuffersTarget = minReadBuffers;
+    }
+    if (currentBuffersTarget > maxReadBuffers) {
+      currentBuffersTarget = maxReadBuffers;
+    }
+    if (currentBuffersTarget > fileBuffers) {

Review Comment:
   when data is broadcast to downstream, the file size may smaller, but actually we need more file buffers, probably to readersize * fileBuffers, so we need know sub partition number. And besides we actually need more buffers for endOfDataevent/PartitionEvent(these need buffers too). May be we just remove fileBuffers this time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1162878848


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Add buffers and increment numBufferOccupied. Free all buffers to global memory pool if this
+   * buffer queue is released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    if (!isReleased) {
+      buffers.addAll(availableBuffers);
+      numBuffersOccupied.addAndGet(availableBuffers.size());
+      pendingRequestBuffers.addAndGet(-1 * availableBuffers.size());
+    } else {
+      for (ByteBuf availableBuffer : availableBuffers) {
+        memoryManager.recycleReadBuffer(availableBuffer);
+      }
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased || numBuffersOccupied.get() > localBuffersTarget) {
+      recycleToGlobalPool(buffer);
+    } else {
+      recycleToLocalPool(buffer);
+    }
+  }
+
+  public synchronized void recycleToGlobalPool(ByteBuf buffer) {
+    numBuffersOccupied.decrementAndGet();
+    memoryManager.recycleReadBuffer(buffer);
+  }
+
+  public synchronized void recycleToLocalPool(ByteBuf buffer) {
+    buffer.clear();
+    buffers.add(buffer);
+  }
+
+  // free unused buffer to the main pool if possible
+  public synchronized void trim() {
+    List<ByteBuf> buffersToFree = new ArrayList<>();
+    while (numBuffersOccupied.get() > localBuffersTarget) {
+      ByteBuf buffer = poll();
+      if (buffer != null) {
+        buffersToFree.add(buffer);
+        numBuffersOccupied.decrementAndGet();
+      } else {
+        // there are no unused buffers here
+        break;
+      }
+    }
+
+    if (!buffersToFree.isEmpty()) {
+      buffersToFree.forEach(memoryManager::recycleReadBuffer);
+    }
+  }
+
+  /**
+   * Releases this buffer queue and recycles all available buffers. After released, no buffer can be
+   * added to or polled from this buffer queue.
+   */
+  public synchronized void release() {
+    isReleased = true;
+    buffers.forEach(this::recycleToGlobalPool);
+    buffers.clear();
+    pendingRequestBuffers.set(0);
+    numBuffersOccupied.set(0);
+  }
+
+  /** Returns true is this buffer queue has been released. */
+  public boolean isReleased() {
+    return isReleased;
+  }
+
+  public int getLocalBuffersTarget() {
+    return localBuffersTarget;
+  }
+
+  public void setLocalBuffersTarget(int localBuffersTarget) {
+    this.localBuffersTarget = localBuffersTarget;
+  }
+
+  public synchronized void tryApplyNewBuffers(

Review Comment:
   only synchronized when you probably can apply new buffers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156703581


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,79 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);
 
       // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      if (allocateResources && bufferQueue.size() == 0) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
+      try {
+        bufferQueue.add(buffers);
+      } catch (Exception e) {
+        // this branch means that this bufferQueue is closed
+        buffers.forEach(memoryManager::recycleReadBuffer);
+        return;
+      }
+
       triggerRead();
+      isWaitingResources.compareAndSet(true, false);

Review Comment:
   set to false directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX merged pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX merged PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161418913


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;

Review Comment:
   There is no need to modify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1162781752


##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int threadsPerMountPoint;
+  private int minBuffersToTriggerRead;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers,
+      int maxReadBuffers,
+      int threadsPerMountpoint,
+      int minBuffersToTriggerRead) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        this.minReadBuffers,
+        this.maxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    synchronized (activeMapPartitions) {
+      MapDataPartition mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                minReadBuffers,
+                maxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                minBuffersToTriggerRead);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      StreamState streamState =
+          new StreamState(channel, fileInfo.getBufferSize(), mapDataPartition);
+      streams.put(streamId, streamState);
+      mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId, channel);
+    }
+
+    addCredit(initialCredit, streamId);
+
+    callback.accept(streamId);

Review Comment:
   callback.accept(streamId) must precedes addCredit(initialCredit, streamId), or the worker might send ReadData before StreamHandle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1159235434


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java:
##########
@@ -23,6 +23,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.exception.CelebornIOException;

Review Comment:
   How about implementing this comment in another refactor PR(#1387)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161584793


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartitionReader.java:
##########
@@ -178,55 +178,53 @@ private void addBuffer(ByteBuf buffer, boolean hasRemaining, Recycler bufferRecy
       isClosed = !hasRemaining;

Review Comment:
   > 
   
   Agree with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161418913


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;

Review Comment:
   There is no need to modify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1159428319


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java:
##########
@@ -49,12 +58,11 @@ public void addBufferRequest(ReadBufferRequest request) {
   public void recycle(ByteBuf buf) {
     int bufferSize = buf.capacity();
     int refCnt = buf.refCnt();
-    if (refCnt != 1) {
-      logger.error("recycle buffer refCnt: {} not equal to 1!", buf.refCnt());
-    }
+    // If a reader failed, related read buffers will have more than one reference count
     if (refCnt > 0) {
       buf.release(refCnt);

Review Comment:
   Yes, this method will ensure a buffer goes here and its reference count will be zero. This method is the end of a buffer's lifecycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161390019


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;

Review Comment:
   use minReadAheadBuffers instead of memory



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;

Review Comment:
   No need to define this member, localBuffersTarget is sufficient.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+

Review Comment:
   localBuffersTarget should be initialized here as ```(max + min) / 2```



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(

Review Comment:
   This method should be modified since we use min/max buffers instead of memory, like
   ```
     private synchronized void updateLocalTarget(int newTarget, int fileBuffers) {
       if (newTarget < minReadAheadBuffers) {
         newTarget = minReadAheadBuffers;
       }
       if (newTarget > maxReadAheadBuffers) {
         newTarget = maxReadAheadBuffers;
       }
       if (newTarget > fileBuffers) {
         newTarget = fileBuffers;
       }
       localBuffersTarget = newTarget;
     }
   ```



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {

Review Comment:
   ditto



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+
+    applyNewBuffers();
+  }
+
+  private void applyNewBuffers() {

Review Comment:
   Should implement this method in BufferQueue, and make the consistency of numOccupied and inFlight in BufferQueue. For example we can make applyNewBuffers a synchronized method inside BufferQueue



##########
common/src/main/java/org/apache/celeborn/common/network/server/CreditStreamManager.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.util.JavaUtils;
+
+public class CreditStreamManager {
+  private static final Logger logger = LoggerFactory.getLogger(CreditStreamManager.class);
+  private final AtomicLong nextStreamId;
+  private final ConcurrentHashMap<Long, StreamState> streams;
+  private final ConcurrentHashMap<FileInfo, MapDataPartition> activeMapPartitions;
+  private final HashMap<String, ExecutorService> storageFetcherPool = new HashMap<>();
+  private int definedMinReadBuffers;
+  private int definedMaxReadBuffers;
+  private int threadsPerMountPoint;
+  private int readAheadMin;
+  private final BlockingQueue<DelayedStreamId> recycleStreamIds = new DelayQueue<>();
+
+  @GuardedBy("lock")
+  private volatile Thread recycleThread;
+
+  private final Object lock = new Object();
+
+  public CreditStreamManager(
+      int minReadBuffers, int maxReadBuffers, int threadsPerMountpoint, int readAheadMin) {
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = JavaUtils.newConcurrentHashMap();
+    activeMapPartitions = JavaUtils.newConcurrentHashMap();
+    definedMinReadBuffers = minReadBuffers;
+    definedMaxReadBuffers = maxReadBuffers;
+    threadsPerMountPoint = threadsPerMountpoint;
+    this.readAheadMin = readAheadMin;
+    MemoryManager.instance().setCreditStreamManager(this);
+    logger.debug(
+        "Initialize buffer stream manager with {} {} {}",
+        definedMinReadBuffers,
+        definedMaxReadBuffers,
+        threadsPerMountpoint);
+  }
+
+  public long registerStream(
+      Consumer<Long> callback,
+      Channel channel,
+      int initialCredit,
+      int startSubIndex,
+      int endSubIndex,
+      FileInfo fileInfo)
+      throws IOException {
+    long streamId = nextStreamId.getAndIncrement();
+    StreamState streamState = new StreamState(channel, fileInfo.getBufferSize());
+    streams.put(streamId, streamState);
+    logger.debug(
+        "Register stream start from {}, streamId: {}, fileInfo: {}",
+        channel.remoteAddress(),
+        streamId,
+        fileInfo);
+    MapDataPartition mapDataPartition;
+    synchronized (activeMapPartitions) {
+      mapDataPartition = activeMapPartitions.get(fileInfo);
+      if (mapDataPartition == null) {
+        mapDataPartition =
+            new MapDataPartition(
+                definedMinReadBuffers,
+                definedMaxReadBuffers,
+                storageFetcherPool,
+                threadsPerMountPoint,
+                fileInfo,
+                id -> recycleStream(id),
+                readAheadMin);
+        activeMapPartitions.put(fileInfo, mapDataPartition);
+      }
+      mapDataPartition.addStream(streamId);
+    }
+
+    addCredit(initialCredit, streamId);
+    streamState.setMapDataPartition(mapDataPartition);

Review Comment:
   IMO should not expose setMapDataPartition in StreamState, should pass mapDataPartition as an argument of the constructor of StreamState.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;

Review Comment:
   inFlightBuffers -> pendingRequestBuffers, and add comments about the variable. And it should be AtomicInteger since multi thread can modify it.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {

Review Comment:
   should use ```>=```, or it may fail to trigger read in corner case where target is 1.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;

Review Comment:
   ditto



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {

Review Comment:
   Should move this block to the start of the method, and recycle the buffer if condition is true



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {

Review Comment:
   try-catch is unnecessary



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();

Review Comment:
   activeStreamIds and readers are redundant, should remove activeStreamIds



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;

Review Comment:
   Should move this variable in BufferQueue, and let BufferQueue to ensure consistency.



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(
+      long nTarget, long fileSize, long definedMinReadAheadMemory, long definedMaxReadAheadMemory) {
+    long target = nTarget;
+    if (target < definedMinReadAheadMemory) {
+      target = definedMinReadAheadMemory;
+    }
+    if (target > definedMaxReadAheadMemory) {
+      target = definedMaxReadAheadMemory;
+    }
+    if (target > fileSize) {
+      target = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(target * 1.0 / fileInfo.getBufferSize());
+    localMemoryTarget = target;
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      inFlightBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() > Math.min(localBuffersTarget / 2 + 1, readAheadMin)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);

Review Comment:
   Remove BufferQueue.add method, and let bufferQueue.recycle to decide whether should add the buffer back to buffers or return to MemoryManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161417951


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile long localMemoryTarget = 0;
+  private volatile int localBuffersTarget = 0;
+  private volatile int inFlightBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long definedMinReadAheadMemory;
+  private long definedMaxReadAheadMemory;
+  private int readAheadMin;
+
+  public MapDataPartition(
+      int definedMinReadBuffers,
+      int definedMaxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int readAheadMin)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    definedMinReadAheadMemory = definedMinReadBuffers * bufferSize;
+    definedMaxReadAheadMemory = definedMaxReadBuffers * bufferSize;
+
+    updateLocalTarget(
+        localMemoryTarget,
+        fileInfo.getFileSize(),
+        definedMinReadAheadMemory,
+        definedMaxReadAheadMemory);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localMemoryTarget,
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.readAheadMin = readAheadMin;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateLocalTarget(

Review Comment:
   Reject. Buffer's size is variable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161523104


##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final List<Long> activeStreamIds = new ArrayList<>();
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private volatile int localBuffersTarget = 0;
+  private volatile int pendingRequestBuffers = 0;
+  private Object applyBufferLock = new Object();
+  private long minReadAheadMemory;
+  private long maxReadAheadMemory;
+  private int minBuffersToTriggerRead;
+
+  public MapDataPartition(
+      int minReadBuffers,
+      int maxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int minBuffersToTriggerRead)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+    int bufferSize = fileInfo.getBufferSize();
+
+    minReadAheadMemory = minReadBuffers * bufferSize;
+    maxReadAheadMemory = maxReadBuffers * bufferSize;
+
+    updateBuffersTarget(0, fileInfo.getFileSize());
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        localBuffersTarget,
+        fileInfo.getBufferSize());
+
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateBuffersTarget(long memoryTarget, long fileSize) {
+    long currentMemoryTarget = memoryTarget;
+    if (currentMemoryTarget < minReadAheadMemory) {
+      currentMemoryTarget = minReadAheadMemory;
+    }
+    if (currentMemoryTarget > maxReadAheadMemory) {
+      currentMemoryTarget = maxReadAheadMemory;
+    }
+    if (currentMemoryTarget > fileSize) {
+      currentMemoryTarget = fileSize;
+    }
+    localBuffersTarget = (int) Math.ceil(currentMemoryTarget * 1.0 / fileInfo.getBufferSize());
+  }
+
+  public synchronized void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              localBuffersTarget,
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    try {
+      bufferQueue.add(buffers);
+      pendingRequestBuffers -= buffers.size();
+    } catch (Exception e) {
+      // this branch means that this bufferQueue is closed
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    if (bufferQueue.size() >= Math.min(localBuffersTarget / 2 + 1, minBuffersToTriggerRead)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {
+      bufferQueue.recycleDirectly(buffer);
+      return;
+    }
+    if (bufferQueue.numBuffersOccupied() > localBuffersTarget) {
+      bufferQueue.recycle(buffer);
+    } else {
+      buffer.clear();
+      bufferQueue.add(buffer);
+    }
+
+    if (bufferQueue.size() >= Math.min(localBuffersTarget / 2 + 1, minBuffersToTriggerRead)) {
+      triggerRead();
+    }
+
+    applyNewBuffers();
+  }
+
+  private void applyNewBuffers() {
+    logger.debug(
+        "try to apply new buffers  {} {} {} {}",
+        bufferQueue.numBuffersOccupied(),
+        bufferQueue.size(),
+        readers.size(),
+        localBuffersTarget);
+    synchronized (applyBufferLock) {
+      if (!readers.isEmpty()
+          && bufferQueue.numBuffersOccupied() + pendingRequestBuffers < localBuffersTarget) {
+        int newBuffersCount =
+            (localBuffersTarget - bufferQueue.numBuffersOccupied() - pendingRequestBuffers);
+        logger.debug(
+            "apply new buffers {} while current buffer queue size {} with read count {} for "
+                + "map data partition {} with active stream id count {}",
+            newBuffersCount,
+            bufferQueue.numBuffersOccupied(),
+            readers.size(),
+            this,
+            activeStreamIds.size());
+
+        pendingRequestBuffers += newBuffersCount;
+        memoryManager.requestReadBuffers(
+            new ReadBufferRequest(
+                newBuffersCount,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      }
+    }
+  }
+
+  public synchronized void readBuffers() {
+    if (isReleased) {
+      // some read executor task may already be submitted to the thread pool
+      return;
+    }
+
+    try {
+      // make sure that all reader are open
+      PriorityQueue<MapDataPartitionReader> sortedReaders = new PriorityQueue<>(readers.values());
+      for (MapDataPartitionReader reader : readers.values()) {
+        reader.open(dataFileChanel, indexChannel);
+      }
+      while (bufferQueue.size() > 0 && !sortedReaders.isEmpty()) {
+        BufferRecycler bufferRecycler = new BufferRecycler(MapDataPartition.this::recycle);
+        MapDataPartitionReader reader = sortedReaders.poll();
+        try {
+          reader.readData(bufferQueue, bufferRecycler);
+        } catch (Throwable e) {
+          logger.error("reader exception, reader: {}, message: {}", reader, e.getMessage(), e);
+          // this reader failed , recycle stream id
+          reader.recycleOnError(e);
+        }
+      }
+    } catch (Throwable e) {
+      logger.error("Fatal: failed to read partition data. {}", e.getMessage(), e);
+      for (MapDataPartitionReader reader : readers.values()) {
+        reader.recycleOnError(e);
+      }
+    }
+  }
+
+  // for one reader only the associated channel can access
+  public void addReaderCredit(int numCredit, long streamId) {
+    MapDataPartitionReader streamReader = getStreamReader(streamId);
+    if (streamReader != null) {
+      streamReader.addCredit(numCredit);
+      readExecutor.submit(() -> streamReader.sendData());
+    }
+  }
+
+  public void triggerRead() {
+    // Key for IO schedule.
+    readExecutor.submit(() -> readBuffers());
+  }
+
+  public void addStream(Long streamId) {
+    synchronized (activeStreamIds) {
+      activeStreamIds.add(streamId);
+    }
+  }
+
+  public void removeStream(Long streamId) {
+    synchronized (activeStreamIds) {
+      activeStreamIds.remove(streamId);
+      readers.remove(streamId);
+    }
+  }
+
+  public MapDataPartitionReader getStreamReader(long streamId) {
+    return readers.get(streamId);
+  }
+
+  public boolean releaseStream(Long streamId) {
+    MapDataPartitionReader mapDataPartitionReader = readers.get(streamId);
+    mapDataPartitionReader.release();
+    if (mapDataPartitionReader.isFinished()) {
+      logger.debug("release all for stream: {}", streamId);
+      removeStream(streamId);
+      return true;
+    }
+
+    return false;
+  }
+
+  public void close() {
+    logger.debug("release map data partition {}", fileInfo);
+
+    IOUtils.closeQuietly(dataFileChanel);
+    IOUtils.closeQuietly(indexChannel);
+    bufferQueue.release();
+
+    isReleased = true;
+  }
+
+  @Override
+  public String toString() {
+    return "MapDataPartition{" + "fileInfo=" + fileInfo.getFilePath() + '}';
+  }
+
+  public List<Long> getActiveStreamIds() {
+    return activeStreamIds;
+  }
+
+  public ConcurrentHashMap<Long, MapDataPartitionReader> getReaders() {

Review Comment:
   Seems this method is not used.



##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -46,6 +46,7 @@ public class FileInfo {
   // members for MapPartition
   private int bufferSize;
   private int numSubpartitions;
+  private long fileSize;

Review Comment:
   Shall we also modify ```PbSerDeUtils.toPbFileInfo``` and '''PbSerDeUtils.fromPbFileInfo''' to add fileSize in it?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -192,11 +202,11 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
   }
 
   def handleEndStreamFromClient(client: TransportClient, req: BufferStreamEnd): Unit = {

Review Comment:
   client is not used, shall we delete from the argument list?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -192,11 +202,11 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
   }
 
   def handleEndStreamFromClient(client: TransportClient, req: BufferStreamEnd): Unit = {
-    bufferStreamManager.notifyStreamEndByClient(req.getStreamId)
+    creditStreamManager.notifyStreamEndByClient(req.getStreamId)
   }
 
   def handleReadAddCredit(client: TransportClient, req: ReadAddCredit): Unit = {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161693221


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  public BufferQueue() {}
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Adds a collection of available buffers to this buffer queue and will throw exception if this
+   * buffer queue has been released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    if (!isReleased) {
+      buffers.addAll(availableBuffers);
+      numBuffersOccupied.addAndGet(availableBuffers.size());
+      pendingRequestBuffers.addAndGet(-1 * availableBuffers.size());
+    } else {
+      for (ByteBuf availableBuffer : availableBuffers) {
+        memoryManager.recycleReadBuffer(availableBuffer);
+      }
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased) {
+      recycleToGlobalPool(buffer);
+    }
+    if (numBuffersOccupied.get() > localBuffersTarget) {

Review Comment:
   merge this if with the upper one



##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  public BufferQueue() {}
+
+  /** Returns the number of available buffers in this buffer queue. */
+  public int size() {
+    return buffers.size();
+  }
+
+  /**
+   * Returns an available buffer from this buffer queue or returns null if no buffer is available
+   * currently.
+   */
+  @Nullable
+  public ByteBuf poll() {
+    return buffers.poll();
+  }
+
+  /**
+   * Adds a collection of available buffers to this buffer queue and will throw exception if this
+   * buffer queue has been released.
+   */
+  public synchronized void add(Collection<ByteBuf> availableBuffers) {
+    if (!isReleased) {
+      buffers.addAll(availableBuffers);
+      numBuffersOccupied.addAndGet(availableBuffers.size());
+      pendingRequestBuffers.addAndGet(-1 * availableBuffers.size());
+    } else {
+      for (ByteBuf availableBuffer : availableBuffers) {
+        memoryManager.recycleReadBuffer(availableBuffer);
+      }
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased) {
+      recycleToGlobalPool(buffer);
+    }
+    if (numBuffersOccupied.get() > localBuffersTarget) {
+      recycleToGlobalPool(buffer);
+    } else {
+      recycleToLocalPool(buffer);
+    }
+  }
+
+  public void recycleToGlobalPool(ByteBuf buffer) {
+    numBuffersOccupied.decrementAndGet();
+    memoryManager.recycleReadBuffer(buffer);
+  }
+
+  public synchronized void recycleToLocalPool(ByteBuf buffer) {

Review Comment:
   Seems synchronized is unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1161718297


##########
common/src/main/java/org/apache/celeborn/common/network/server/memory/BufferQueue.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server.memory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
+public class BufferQueue {
+  public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);
+
+  private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
+
+  private final MemoryManager memoryManager = MemoryManager.instance();
+
+  /** Number of buffers occupied by this buffer queue (added but still not recycled). */
+  private final AtomicInteger numBuffersOccupied = new AtomicInteger();
+
+  private final AtomicInteger pendingRequestBuffers = new AtomicInteger();
+
+  /** Whether this buffer queue is released or not. */
+  private volatile boolean isReleased = false;
+
+  private volatile int localBuffersTarget = 0;
+
+  public BufferQueue() {}

Review Comment:
   Is this necessary?



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int fileBuffers;
+  private int minBuffersToTriggerRead;
+
+  public MapDataPartition(
+      int minReadBuffers,
+      int maxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int minBuffersToTriggerRead)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    this.fileBuffers = (int) Math.ceil(fileInfo.getFileSize() * 1.0 / fileInfo.getBufferSize());
+
+    updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        bufferQueue.getLocalBuffersTarget(),
+        fileInfo.getBufferSize());
+
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateBuffersTarget(int buffersTarget) {
+    int currentBuffersTarget = buffersTarget;
+    if (currentBuffersTarget < minReadBuffers) {
+      currentBuffersTarget = minReadBuffers;
+    }
+    if (currentBuffersTarget > maxReadBuffers) {
+      currentBuffersTarget = maxReadBuffers;
+    }
+    if (currentBuffersTarget > fileBuffers) {
+      currentBuffersTarget = fileBuffers;
+    }
+    bufferQueue.setLocalBuffersTarget(currentBuffersTarget);
+  }
+
+  public void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              bufferQueue.getLocalBuffersTarget(),
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {

Review Comment:
   IMO it's unnecessary to check ```bufferQueue.isReleased()``` since ```bufferQueue.add()``` will check



##########
common/src/main/java/org/apache/celeborn/common/network/server/MapDataPartition.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.server;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.network.server.memory.BufferQueue;
+import org.apache.celeborn.common.network.server.memory.BufferRecycler;
+import org.apache.celeborn.common.network.server.memory.MemoryManager;
+import org.apache.celeborn.common.network.server.memory.ReadBufferRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+// this means active data partition
+class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
+  public static final Logger logger = LoggerFactory.getLogger(MapDataPartition.class);
+  private final FileInfo fileInfo;
+  private final ExecutorService readExecutor;
+  private final ConcurrentHashMap<Long, MapDataPartitionReader> readers =
+      JavaUtils.newConcurrentHashMap();
+  private FileChannel dataFileChanel;
+  private FileChannel indexChannel;
+  private volatile boolean isReleased = false;
+  private final BufferQueue bufferQueue = new BufferQueue();
+  private boolean bufferQueueInitialized = false;
+  private MemoryManager memoryManager = MemoryManager.instance();
+  private Consumer<Long> recycleStream;
+  private int minReadBuffers;
+  private int maxReadBuffers;
+  private int fileBuffers;
+  private int minBuffersToTriggerRead;
+
+  public MapDataPartition(
+      int minReadBuffers,
+      int maxReadBuffers,
+      HashMap<String, ExecutorService> storageFetcherPool,
+      int threadsPerMountPoint,
+      FileInfo fileInfo,
+      Consumer<Long> recycleStream,
+      int minBuffersToTriggerRead)
+      throws FileNotFoundException {
+    this.recycleStream = recycleStream;
+    this.fileInfo = fileInfo;
+
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    this.fileBuffers = (int) Math.ceil(fileInfo.getFileSize() * 1.0 / fileInfo.getBufferSize());
+
+    updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
+
+    logger.debug(
+        "read map partition {} with {} {} {}",
+        fileInfo.getFilePath(),
+        bufferQueue.getLocalBuffersTarget(),
+        fileInfo.getBufferSize());
+
+    this.minBuffersToTriggerRead = minBuffersToTriggerRead;
+
+    readExecutor =
+        storageFetcherPool.computeIfAbsent(
+            fileInfo.getMountPoint(),
+            k ->
+                Executors.newFixedThreadPool(
+                    threadsPerMountPoint,
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(fileInfo.getMountPoint() + "-reader-thread-%d")
+                        .setUncaughtExceptionHandler(
+                            (t1, t2) -> {
+                              logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
+                            })
+                        .build()));
+    this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
+    this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+  }
+
+  private synchronized void updateBuffersTarget(int buffersTarget) {
+    int currentBuffersTarget = buffersTarget;
+    if (currentBuffersTarget < minReadBuffers) {
+      currentBuffersTarget = minReadBuffers;
+    }
+    if (currentBuffersTarget > maxReadBuffers) {
+      currentBuffersTarget = maxReadBuffers;
+    }
+    if (currentBuffersTarget > fileBuffers) {
+      currentBuffersTarget = fileBuffers;
+    }
+    bufferQueue.setLocalBuffersTarget(currentBuffersTarget);
+  }
+
+  public void setupDataPartitionReader(
+      int startSubIndex, int endSubIndex, long streamId, Channel channel) {
+    MapDataPartitionReader mapDataPartitionReader =
+        new MapDataPartitionReader(
+            startSubIndex,
+            endSubIndex,
+            fileInfo,
+            streamId,
+            channel,
+            () -> recycleStream.accept(streamId));
+    readers.put(streamId, mapDataPartitionReader);
+
+    // allocate resources when the first reader is registered
+    if (!bufferQueueInitialized) {
+      memoryManager.requestReadBuffers(
+          new ReadBufferRequest(
+              bufferQueue.getLocalBuffersTarget(),
+              fileInfo.getBufferSize(),
+              (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
+      bufferQueueInitialized = true;
+    } else {
+      triggerRead();
+    }
+  }
+
+  // Read logic is executed on another thread.
+  public void onBuffer(List<ByteBuf> buffers) {
+    if (isReleased || bufferQueue.isReleased()) {
+      buffers.forEach(memoryManager::recycleReadBuffer);
+      return;
+    }
+
+    bufferQueue.add(buffers);
+
+    if (bufferQueue.size()
+        >= Math.min(bufferQueue.getLocalBuffersTarget() / 2 + 1, minBuffersToTriggerRead)) {
+      triggerRead();
+    }
+  }
+
+  public void recycle(ByteBuf buffer) {
+    if (isReleased || readers.isEmpty() || bufferQueue.isReleased()) {

Review Comment:
   IMO it's unnecessary to check ```bufferQueue.isReleased()``` here since if ```isReleased``` is true, it is guaranteed that bufferQueue is also released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1156678506


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -319,28 +329,71 @@ public synchronized void setupDataPartitionReader(
       streamReaders.put(streamId, dataPartitionReader);
 
       // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      if (allocateResources && bufferQueue.size() == 0) {
         memoryManager.requestReadBuffers(
-            minReadBuffers,
-            maxReadBuffers,
-            fileInfo.getBufferSize(),
-            (allocatedBuffers, throwable) ->
-                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+            new ReadBufferRequest(
+                minReadBuffers,
+                maxReadBuffers,
+                fileInfo.getBufferSize(),
+                (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers)));
       } else {
         triggerRead();
       }
     }
 
     // Read logic is executed on another thread.
-    public void onBuffer(Queue<ByteBuf> buffers) {
-      this.buffers = buffers;
+    public void onBuffer(List<ByteBuf> buffers) {
+      if (isReleased || bufferQueue.isReleased()) {

Review Comment:
   This might have some problem here, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1407: [CELEBORN-431][FLINK] Support dynamic buffer allocation in reading map partition.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1407:
URL: https://github.com/apache/incubator-celeborn/pull/1407#discussion_r1158530861


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -313,74 +321,118 @@ public synchronized void setupDataPartitionReader(
               streamId,
               channel,
               () -> recycleStream(streamId));
-      // allocate resources when the first reader is registered
-      boolean allocateResources = readers.isEmpty();
-      readers.add(dataPartitionReader);
-      streamReaders.put(streamId, dataPartitionReader);
+      readers.put(streamId, dataPartitionReader);
 
-      // create initial buffers for read
-      if (allocateResources && buffers == null) {
+      // allocate resources when the first reader is registered
+      if (firstReaderRegister.compareAndSet(true, false)) {

Review Comment:
   firstReaderRegister needn't to be an atomicBoolean as it only access by one thread at a time. And the name would be better rename to bufferInitialized or bufferXXX for clarify the purpose



##########
common/src/main/java/org/apache/celeborn/common/network/server/DataPartitionReader.java:
##########
@@ -112,7 +115,7 @@ public DataPartitionReader(
   }
 
   public void open(FileChannel dataFileChannel, FileChannel indexFileChannel) throws IOException {
-    if (!isOpen) {
+    if (isOpen.compareAndSet(false, true)) {

Review Comment:
   seems also needn't to be atomicBoolean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org