You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/22 12:47:17 UTC

[GitHub] [incubator-celeborn] zy-jordan opened a new pull request, #993: Split max reqs in flight

zy-jordan opened a new pull request, #993:
URL: https://github.com/apache/incubator-celeborn/pull/993

   # [FEATURE] Split maxReqsInFlight limitation into level of target worker
   
   ### What changes were proposed in this pull request?
   Split maxReqsInFlight limitation into level of target worker
   split maxReqsInFlight limitation to every address pair.
   
   ### Why are the changes needed?
   if one push one worker data is pause, the global limitation will block other healthy worker.
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   #992 
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @AngersZhuuuu 
   
   /assign @AngersZhuuuu 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #993: Split max reqs in flight

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1323761434

   Plz migrate the issue to jira https://issues.apache.org/jira/projects/CELEBORN/issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1032786798


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1034441784


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   hello, @waitinfuture 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1034441784


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   hello, @waitinfuture . batchIdSetPerPair is ConcurrentHashMap, but batchIdSetPerPair's value is HashSet. So if I make batchIdSetPerPair's value is ConcurrentHashMap.newKeySet(), it will be thread-safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1357353245

   # [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/993?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#993](https://codecov.io/gh/apache/incubator-celeborn/pull/993?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (79b31fd) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/13769f0f0a2401aacc66162a2f6006816a175ca6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (13769f0) will **increase** coverage by `0.24%`.
   > The diff coverage is `8.13%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##               main     #993      +/-   ##
   ============================================
   + Coverage     25.08%   25.32%   +0.24%     
   - Complexity      770      788      +18     
   ============================================
     Files           215      214       -1     
     Lines         18208    18118      -90     
     Branches       2024     2050      +26     
   ============================================
   + Hits           4566     4586      +20     
   + Misses        13331    13223     -108     
   + Partials        311      309       -2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/993?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/celeborn/client/read/DfsPartitionReader.java](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvcmVhZC9EZnNQYXJ0aXRpb25SZWFkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/celeborn/client/write/DataPushQueue.java](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvRGF0YVB1c2hRdWV1ZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...a/org/apache/celeborn/client/write/DataPusher.java](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvRGF0YVB1c2hlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pache/celeborn/client/ChangePartitionManager.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY2xpZW50L0NoYW5nZVBhcnRpdGlvbk1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...ala/org/apache/celeborn/client/CommitManager.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY2xpZW50L0NvbW1pdE1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [.../org/apache/celeborn/client/LifecycleManager.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY2xpZW50L0xpZmVjeWNsZU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...java/org/apache/celeborn/common/meta/FileInfo.java](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbWV0YS9GaWxlSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/celeborn/common/meta/PartitionLocationInfo.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL21ldGEvUGFydGl0aW9uTG9jYXRpb25JbmZvLnNjYWxh) | `0.00% <0.00%> (ø)` | |
   | [...born/common/protocol/message/ControlMessages.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3Byb3RvY29sL21lc3NhZ2UvQ29udHJvbE1lc3NhZ2VzLnNjYWxh) | `0.14% <0.00%> (+0.01%)` | :arrow_up: |
   | [...he/celeborn/common/quota/DefaultQuotaManager.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3F1b3RhL0RlZmF1bHRRdW90YU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | ... and [22 more](https://codecov.io/gh/apache/incubator-celeborn/pull/993/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1350368355

   > 
   
   Hi @zy-jordan ,seems 2,3,4 related code is not submitted


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1031986108


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +97,22 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {
+    String hostAndPushPort = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair = batchIdPerAddressPair.get(hostAndPushPort);
+    if (Objects.nonNull(batchIdSetPerPair)) {

Review Comment:
   Which case loc can be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan closed pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan closed pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker
URL: https://github.com/apache/incubator-celeborn/pull/993


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1328047717

   I think we should also modify the documentation and default value of celeborn.push.maxReqsInFlight, since it's changed from global to per-worker meaning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1328049291

   I'm afraid this pr doesn't do what we want, limitMaxInFlight on one worker might block the pusher. DataPusher is single-threaded and sequentially pushes data in the workingQueue, see DataPusher.
   
   cc @AngersZhuuuu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1031149784


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -30,14 +31,17 @@
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
 
 public class PushState {
   private static final Logger logger = LoggerFactory.getLogger(PushState.class);
 
   private int pushBufferMaxSize;
 
   public final AtomicInteger batchId = new AtomicInteger();
-  public final ConcurrentHashMap<Integer, PartitionLocation> inFlightBatches =
+  private final ConcurrentHashMap<Integer, PartitionLocation> inFlightBatches =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
       new ConcurrentHashMap<>();

Review Comment:
   Why not just 
   ```
   // master location's hostToPushport -> batchs
   private final ConcurrentHashMap<String, Set<Integer>> batchIdsPerAddressPair =
         new ConcurrentHashMap<>();
   ```
   `



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +100,21 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = Utils.genAddressPair(loc);
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+    inFlightBatches.put(batchId, loc);
+  }
+
+  public void removeFlightBatches(int batchId) {
+    PartitionLocation loc = inFlightBatches.remove(batchId);
+    String addressPair = Utils.genAddressPair(loc);

Review Comment:
   Celeborn just push data to master location then master location's corresponding worker push data to slave location's worker, so here seems we don't need to check if it's peer is null?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -379,13 +371,52 @@ private void limitMaxInFlight(String mapKey, PushState pushState, int limit) thr
 
     if (times <= 0) {
       logger.error(
-          "After waiting for {} ms, there are still {} batches in flight for map {}, "
+          "After waiting for {} ms, there are still {} batches in flight for map {} and addressPair {}, "
               + "which exceeds the limit {}.",
           timeoutMs,
-          inFlightBatches.size(),
+          batchIdSet.size(),
           mapKey,
+          addressPair,
           limit);
-      logger.error("Map: {} in flight batches: {}", mapKey, inFlightBatches);
+      logger.error(
+          "Map: {} with addressPair {} in flight batches: {}", mapKey, addressPair, batchIdSet);
+      throw new IOException("wait timeout for task " + mapKey, pushState.exception.get());
+    }
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+  }
+
+  private void limitZeroInFlight(String mapKey, PushState pushState) throws IOException {
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+    long timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    long delta = conf.pushLimitInFlightSleepDeltaMs();
+    long times = timeoutMs / delta;
+    ConcurrentHashMap<Integer, PartitionLocation> inFlightBatches = pushState.getInFlightBatches();
+
+    try {
+      while (times > 0) {
+        if (inFlightBatches.size() == 0) {

Review Comment:
   `pushState.batchIdsPerAddress.values.map(_.asScala.sum).asScala.sum`?



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -30,14 +31,17 @@
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
 
 public class PushState {
   private static final Logger logger = LoggerFactory.getLogger(PushState.class);
 
   private int pushBufferMaxSize;
 
   public final AtomicInteger batchId = new AtomicInteger();
-  public final ConcurrentHashMap<Integer, PartitionLocation> inFlightBatches =
+  private final ConcurrentHashMap<Integer, PartitionLocation> inFlightBatches =

Review Comment:
   I think we can remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1340424445

   > I'm afraid this pr doesn't do what we want, limitMaxInFlight on one worker can block pushing to other workers. DataPusher is single-threaded and sequentially pushes data in the workingQueue, see DataPusher.
   > 
   > cc @AngersZhuuuu
   
   Checked the code, here exists the limitation exists.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1346340220

   # DataPusher
   1. make one PushTask queue for one Worker.
   2. single push data thread. it get task from each `worker's push queue`.
   3. check `max request in flight` limitation before take task from push, if reach limitation, go to next worker's push queue.
   4. `max request in flight` should be together with `worker's push queue`, encapsulate them in one object.
   
   @waitinfuture @AngersZhuuuu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1328043133

   ping @waitinfuture 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#issuecomment-1328047987

   > 
   
   Sorry for the delay, I have submitted several comments, thanks for the pr! @zy-jordan @AngersZhuuuu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1032036926


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +97,22 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {
+    String hostAndPushPort = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair = batchIdPerAddressPair.get(hostAndPushPort);
+    if (Objects.nonNull(batchIdSetPerPair)) {

Review Comment:
   Technically, the loc will not be null, it's just my coding habits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1032786581


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   There can be concurrent calls to this method, but batchIdSetPerPair is not thread-safe.



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -46,6 +46,14 @@ public PushState(CelebornConf conf) {
     pushBufferMaxSize = conf.pushBufferMaxSize();
   }
 
+  public ConcurrentHashMap<String, Set<Integer>> getBatchIdPerAddressPair() {
+    return batchIdPerAddressPair;
+  }
+
+  public Set<Integer> getBatchIdSetByAddressPair(String addressPair) {
+    return batchIdPerAddressPair.computeIfAbsent(addressPair, pair -> new HashSet<>());

Review Comment:
   I think we should use ConcurrentHashMap.newKeySet()



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   I think it's better to pass hostAndPushPort as parameter instead of loc



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   I think it's better to pass hostAndPushPort as parameter instead of loc



##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {
+    String addressPair = loc.hostAndPushPort();
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(addressPair, id -> new HashSet<>());

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #993: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #993:
URL: https://github.com/apache/incubator-celeborn/pull/993#discussion_r1034447640


##########
client/src/main/java/org/apache/celeborn/client/write/PushState.java:
##########
@@ -88,4 +96,20 @@ public boolean addBatchData(String addressPair, PartitionLocation loc, int batch
   public DataBatches takeDataBatches(String addressPair) {
     return batchesMap.remove(addressPair);
   }
+
+  public void addFlightBatches(int batchId, PartitionLocation loc) {

Review Comment:
   hello, @waitinfuture . Execute me, could tell me why put a string object is better than an object reference?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org