You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/26 13:34:48 UTC

[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

waitinfuture commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1032786581


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   There can be concurrent calls to this method, but batchIdSetPerPair is not thread-safe.



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -46,6 +46,14 @@ public PushState(CelebornConf conf) {
     pushBufferMaxSize = conf.pushBufferMaxSize();
   }
 
+  public ConcurrentHashMap<String, Set<Integer>> getBatchIdPerAddressPair() {
+    return batchIdPerAddressPair;
+  }
+
+  public Set<Integer> getBatchIdSetByAddressPair(String addressPair) {
+    return batchIdPerAddressPair.computeIfAbsent(addressPair, pair -> new HashSet<>());

Review Comment:
   I think we should use ConcurrentHashMap.newKeySet()



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   I think it's better to pass hostAndPushPort as parameter instead of loc



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   I think it's better to pass hostAndPushPort as parameter instead of loc



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org