You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/12/19 10:18:56 UTC

[GitHub] [incubator-celeborn] zy-jordan opened a new pull request, #1102: split pushdata queue by every partitionId

zy-jordan opened a new pull request, #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   split push data queue by every partitionId
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1060455497


##########
client/src/main/java/org/apache/celeborn/client/write/InFlightTracker.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class InFlightTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightTracker.class);
+
+  private final long timeoutMs;
+  private final long delta;
+  private final int maxInFlight;
+
+  private final PushState pushState;
+
+  public final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
+      new ConcurrentHashMap<>();
+
+  public InFlightTracker(CelebornConf conf, PushState pushState) {
+    this.timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    this.delta = conf.pushLimitInFlightSleepDeltaMs();
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    this.pushState = pushState;
+  }
+
+  public void addFlightBatches(int batchId, String hostAndPushPort) {

Review Comment:
   Should we add sync for such method about `batchIdPerAddressPair `? @waitinfuture 



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightTracker.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class InFlightTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightTracker.class);
+
+  private final long timeoutMs;
+  private final long delta;
+  private final int maxInFlight;
+
+  private final PushState pushState;
+
+  public final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
+      new ConcurrentHashMap<>();
+
+  public InFlightTracker(CelebornConf conf, PushState pushState) {

Review Comment:
   InFlightRequestTracker? We'd better show in flight what.



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    final int capacity = conf.pushQueueCapacity();
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks = workingQueuePerPartition.get(partitionId);
+      if (!pushTasks.isEmpty()) {
+        Map<Integer, PartitionLocation> partitionLocationMap =
+            client.getOrRegisterShuffle(appId, shuffleId, numMappers, numPartitions);
+        PartitionLocation loc = partitionLocationMap.get(partitionId);
+        boolean reachLimit = pushState.limitMaxInFlight(loc.hostAndPushPort());
+        if (!reachLimit) {
+          return pushTasks;
+        }
+      }
+    }
+    return null;
+  }
+
+  public LinkedBlockingQueue<PushTask> takeWorkingQueue(int partitionId) {
+    return workingQueuePerPartition.get(partitionId);
+  }
+
+  public void clear() {
+    workingQueuePerPartition.parallelStream().forEach(LinkedBlockingQueue::clear);
+    workingQueuePerPartition.clear();
+  }
+
+  private int nextPartitionId() {
+    return partitionIdIdx++ % numPartitions;

Review Comment:
   Don't understand why use `nextPartitionId` like this, can you explain more?



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightTracker.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class InFlightTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightTracker.class);
+
+  private final long timeoutMs;
+  private final long delta;
+  private final int maxInFlight;
+
+  private final PushState pushState;
+
+  public final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
+      new ConcurrentHashMap<>();
+
+  public InFlightTracker(CelebornConf conf, PushState pushState) {
+    this.timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    this.delta = conf.pushLimitInFlightSleepDeltaMs();
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    this.pushState = pushState;
+  }
+
+  public void addFlightBatches(int batchId, String hostAndPushPort) {
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, String hostAndPushPort) {
+    Set<Integer> batchIdSetPerPair = batchIdPerAddressPair.get(hostAndPushPort);
+    batchIdSetPerPair.remove(batchId);
+    if (batchIdSetPerPair.size() == 0) {
+      batchIdPerAddressPair.remove(hostAndPushPort);
+    }
+  }
+
+  public ConcurrentHashMap<String, Set<Integer>> getBatchIdPerAddressPair() {
+    return batchIdPerAddressPair;
+  }
+
+  public Set<Integer> getBatchIdSetByAddressPair(String addressPair) {
+    return batchIdPerAddressPair.computeIfAbsent(
+        addressPair, pair -> ConcurrentHashMap.newKeySet());
+  }
+
+  public boolean limitMaxInFlight(String hostAndPushPort) throws IOException {
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+    long times = timeoutMs / delta;
+    try {
+      while (times > 0) {
+        if (batchIdSet.size() <= maxInFlight) {
+          break;
+        }
+        if (pushState.exception.get() != null) {
+          throw pushState.exception.get();
+        }
+        Thread.sleep(delta);
+        times--;
+      }
+    } catch (InterruptedException e) {
+      pushState.exception.set(new IOException(e));
+    }
+
+    if (times <= 0) {
+      logger.warn(
+          "After waiting for {} ms, "
+              + "there are still {} batches in flight "
+              + "for hostAndPushPort {}, "
+              + "which exceeds the limit {}.",
+          timeoutMs,
+          batchIdSet.size(),
+          hostAndPushPort,
+          maxInFlight);
+    }
+
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    return times <= 0;
+  }
+
+  public boolean limitZeroInFlight() throws IOException {
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+    long times = timeoutMs / delta;
+
+    try {
+      while (times > 0) {
+        if (batchIdPerAddressPair.size() == 0) {
+          break;
+        }
+        if (pushState.exception.get() != null) {
+          throw pushState.exception.get();
+        }
+        Thread.sleep(delta);
+        times--;
+      }
+    } catch (InterruptedException e) {
+      pushState.exception.set(new IOException(e));
+    }
+
+    if (times <= 0) {
+      logger.error(
+          "After waiting for {} ms, there are still {} batches in flight, expect 0 batches",
+          timeoutMs,
+          batchIdPerAddressPair.values().stream().map(Set::size).reduce(Integer::sum));
+    }
+
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    return times <= 0;
+  }
+
+  protected int nextBatchId() {
+    return batchId.getAndIncrement();

Review Comment:
   Origin code use `addAndGet`, here use `getAndIncrement`, seems not consistent.



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {

Review Comment:
   Add some comments for the new class about why we need this?



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {

Review Comment:
   Add some comments for the new class about why we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1065418643


##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {

Review Comment:
   I will add it.



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightTracker.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class InFlightTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightTracker.class);
+
+  private final long timeoutMs;
+  private final long delta;
+  private final int maxInFlight;
+
+  private final PushState pushState;
+
+  public final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
+      new ConcurrentHashMap<>();
+
+  public InFlightTracker(CelebornConf conf, PushState pushState) {

Review Comment:
   I will add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1073075434


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -192,7 +192,7 @@ private void submitRetryPushData(
     } else if (mapperEnded(shuffleId, mapId, attemptId)) {
       logger.debug(
           "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
-      pushState.removeBatch(batchId);
+      pushState.removeBatch(batchId, loc.hostAndPushPort());

Review Comment:
   What I mean is that we can refactor the hostAndPushPort() in PartitionLocation, like the  following
   ```
     private transient String _hostPushPort = null;
   
     public PartitionLocation(xxx) {
     ...
     this._hostPushPort = host + ":" + pushPort;
     }
   
     public String hostAndPushPort() {
       return _hostPort;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1065418443


##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    final int capacity = conf.pushQueueCapacity();
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks = workingQueuePerPartition.get(partitionId);
+      if (!pushTasks.isEmpty()) {
+        Map<Integer, PartitionLocation> partitionLocationMap =
+            client.getOrRegisterShuffle(appId, shuffleId, numMappers, numPartitions);
+        PartitionLocation loc = partitionLocationMap.get(partitionId);
+        boolean reachLimit = pushState.limitMaxInFlight(loc.hostAndPushPort());
+        if (!reachLimit) {
+          return pushTasks;
+        }
+      }
+    }
+    return null;
+  }
+
+  public LinkedBlockingQueue<PushTask> takeWorkingQueue(int partitionId) {
+    return workingQueuePerPartition.get(partitionId);
+  }
+
+  public void clear() {
+    workingQueuePerPartition.parallelStream().forEach(LinkedBlockingQueue::clear);
+    workingQueuePerPartition.clear();
+  }
+
+  private int nextPartitionId() {
+    return partitionIdIdx++ % numPartitions;

Review Comment:
   nextPartitionId, it will loop get PushTask from workingQueuePerPartition. 
   If one worker is reach limitation, it will skip this partition and get it next loop.



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+public class DataPushQueue {

Review Comment:
   I will add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1071712247


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -192,7 +192,7 @@ private void submitRetryPushData(
     } else if (mapperEnded(shuffleId, mapId, attemptId)) {
       logger.debug(
           "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
-      pushState.removeBatch(batchId);
+      pushState.removeBatch(batchId, loc.hostAndPushPort());

Review Comment:
   Or, I need a member to store the (batchId->worker) map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1071712247


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -192,7 +192,7 @@ private void submitRetryPushData(
     } else if (mapperEnded(shuffleId, mapId, attemptId)) {
       logger.debug(
           "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
-      pushState.removeBatch(batchId);
+      pushState.removeBatch(batchId, loc.hostAndPushPort());

Review Comment:
   Or, I need a member to store the (batchId->worker) map. if this, the map may be very big.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#issuecomment-1387014916

   please run ./dev/reformat


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on pull request #1102: split pushdata queue by every partitionId

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#issuecomment-1357418413

   reopen from
   https://github.com/apache/incubator-celeborn/pull/993
   #993 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1071711171


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -192,7 +192,7 @@ private void submitRetryPushData(
     } else if (mapperEnded(shuffleId, mapId, attemptId)) {
       logger.debug(
           "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
-      pushState.removeBatch(batchId);
+      pushState.removeBatch(batchId, loc.hostAndPushPort());

Review Comment:
   In InFlightRequestTracker, `inflightBatchesPerAddress` is ConcurrentHashMap<String, ConcurrentHashMap<Integer, BatchInfo>>, means (worker -> (batchId -> BatchInfo)), so I need worker and get the batchId map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1073066705


##########
client/src/main/java/org/apache/celeborn/client/write/InFlightRequestTracker.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.protocol.message.StatusCode;
+
+/*
+ * This class is for track in flight request and limit request.
+ * */
+public class InFlightRequestTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightRequestTracker.class);
+
+  private final long timeoutMs;
+  private final long pushTimeout;

Review Comment:
   pushTimeout -> pushTimeoutMs



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one PushTask whose worker inflight request is not reach limit,
+ * and it can add one PushTask.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final long WAIT_TIME_NANOS = TimeUnit.MILLISECONDS.toNanos(500);
+
+  private final LinkedBlockingQueue<PushTask> workingQueue;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private final Set<String> reachLimitWorker = new HashSet<>();
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();
+    workingQueue = new LinkedBlockingQueue<>(capacity);
+  }
+
+  public PushTask takePushTask() throws IOException {

Review Comment:
   we should add a comment saying that this method is not thread-safe



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1272,7 +1272,7 @@ object CelebornConf extends Logging {
       .withAlternative("rss.push.data.maxReqsInFlight")
       .categories("client")
       .version("0.2.0")
-      .doc("Amount of Netty in-flight requests. The maximum memory is " +
+      .doc("Amount of Netty in-flight requests per worker. The maximum memory is " +
         "`celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * " +
         "compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib")
       .intConf

Review Comment:
   We can change the default value to 4



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightRequestTracker.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.protocol.message.StatusCode;
+
+/*
+ * This class is for track in flight request and limit request.
+ * */
+public class InFlightRequestTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightRequestTracker.class);
+
+  private final long timeoutMs;

Review Comment:
   timeoutMs -> waitInflightTimeoutMs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#issuecomment-1381819684

   Hi @zy-jordan , thanks for your pr! I have some comments, feel free to ping me if further discussion is needed. cc @AngersZhuuuu @RexXiong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#issuecomment-1357442415

   # [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1102?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1102](https://codecov.io/gh/apache/incubator-celeborn/pull/1102?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (43befe1) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/13769f0f0a2401aacc66162a2f6006816a175ca6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (13769f0) will **increase** coverage by `0.03%`.
   > The diff coverage is `32.89%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##               main    #1102      +/-   ##
   ============================================
   + Coverage     25.08%   25.11%   +0.03%     
   - Complexity      770      784      +14     
   ============================================
     Files           215      217       +2     
     Lines         18208    18299      +91     
     Branches       2024     2038      +14     
   ============================================
   + Hits           4566     4594      +28     
   - Misses        13331    13392      +61     
   - Partials        311      313       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1102?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...java/org/apache/celeborn/client/ShuffleClient.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvU2h1ZmZsZUNsaWVudC5qYXZh) | `6.53% <ø> (ø)` | |
   | [...rg/apache/celeborn/client/write/DataPushQueue.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvRGF0YVB1c2hRdWV1ZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...a/org/apache/celeborn/client/write/DataPusher.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvRGF0YVB1c2hlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../apache/celeborn/client/write/InFlightTracker.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvSW5GbGlnaHRUcmFja2VyLmphdmE=) | `40.63% <40.63%> (ø)` | |
   | [.../org/apache/celeborn/client/ShuffleClientImpl.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvU2h1ZmZsZUNsaWVudEltcGwuamF2YQ==) | `19.62% <43.25%> (-0.33%)` | :arrow_down: |
   | [...va/org/apache/celeborn/client/write/PushState.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvUHVzaFN0YXRlLmphdmE=) | `62.50% <87.50%> (+4.81%)` | :arrow_up: |
   | [...ice/deploy/master/clustermeta/ha/HARaftServer.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1102/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bWFzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9zZXJ2aWNlL2RlcGxveS9tYXN0ZXIvY2x1c3Rlcm1ldGEvaGEvSEFSYWZ0U2VydmVyLmphdmE=) | `77.93% <0.00%> (+1.36%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1073073340


##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala:
##########
@@ -38,6 +38,7 @@ class HugeDataTest extends AnyFunSuite
 
   test("celeborn spark integration test - huge data") {
     val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
+      .set("spark.celeborn.shuffle.register.maxRetries", "10")

Review Comment:
   > 
   
   Is this related to this pr?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture merged pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture merged PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1071716106


##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala:
##########
@@ -38,6 +38,7 @@ class HugeDataTest extends AnyFunSuite
 
   test("celeborn spark integration test - huge data") {
     val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
+      .set("spark.celeborn.shuffle.register.maxRetries", "10")

Review Comment:
   this ut will failed, because of `registerShuffleInternal` will return null. So I increase this conf.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1069294838


##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;

Review Comment:
   We should not assume that partitionId is sequential starting from 0 with step 1. Better to use a map



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks;
+      try {
+        pushTasks = workingQueuePerPartition.get(partitionId);
+      } catch (IndexOutOfBoundsException ex) {
+        if (dataPusher.terminatedOrHasException()) {
+          return null;
+        }
+        throw ex;
+      }
+      if (Objects.isNull(pushTasks) && dataPusher.terminatedOrHasException()) {

Review Comment:
   We don't need Objects.isNull(pushTasks) since the DataPusher thread will stop before we clear DataPushQueue



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightRequestTracker.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.protocol.message.StatusCode;
+
+/*
+ * This class is for track in flight request and limit request.
+ * */
+public class InFlightRequestTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightRequestTracker.class);
+
+  private final long timeoutMs;
+  private final long pushTimeout;
+  private final long delta;
+  private final PushState pushState;
+
+  private final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, BatchInfo>>
+      batchIdPerAddressPair = new ConcurrentHashMap<>();

Review Comment:
   better rename to inflightBatchesPerAddress



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -192,7 +192,7 @@ private void submitRetryPushData(
     } else if (mapperEnded(shuffleId, mapId, attemptId)) {
       logger.debug(
           "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
-      pushState.removeBatch(batchId);
+      pushState.removeBatch(batchId, loc.hostAndPushPort());

Review Comment:
   We can make hostAndPushPort a transient member and initiates in Constructor to speedup this method.



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightRequestTracker.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.protocol.message.StatusCode;
+
+/*
+ * This class is for track in flight request and limit request.
+ * */
+public class InFlightRequestTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightRequestTracker.class);
+
+  private final long timeoutMs;
+  private final long pushTimeout;
+  private final long delta;
+  private final PushState pushState;
+
+  private final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, BatchInfo>>
+      batchIdPerAddressPair = new ConcurrentHashMap<>();
+
+  public InFlightRequestTracker(CelebornConf conf, PushState pushState) {
+    this.timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    this.pushTimeout = conf.pushDataTimeoutMs();
+    this.delta = conf.pushLimitInFlightSleepDeltaMs();
+    this.pushState = pushState;
+  }
+
+  public void addFlightBatches(int batchId, String hostAndPushPort) {

Review Comment:
   addFlightBatch



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks;
+      try {
+        pushTasks = workingQueuePerPartition.get(partitionId);
+      } catch (IndexOutOfBoundsException ex) {
+        if (dataPusher.terminatedOrHasException()) {
+          return null;
+        }
+        throw ex;
+      }
+      if (Objects.isNull(pushTasks) && dataPusher.terminatedOrHasException()) {
+        return null;
+      }
+      if (!pushTasks.isEmpty()) {
+        Map<Integer, PartitionLocation> partitionLocationMap =
+            client.getOrRegisterShuffle(appId, shuffleId, numMappers, numPartitions);

Review Comment:
   IMO we should not expose this method to PushDataQueue. We can add a method in client like
   ```
   PartitionLocation getPartitionLocation(partitionId);
   ```
   which can call registerShuffle if needed



##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala:
##########
@@ -38,6 +38,7 @@ class HugeDataTest extends AnyFunSuite
 
   test("celeborn spark integration test - huge data") {
     val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
+      .set("spark.celeborn.shuffle.register.maxRetries", "10")

Review Comment:
   just curious, what's the purpose to set this config?



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks;
+      try {
+        pushTasks = workingQueuePerPartition.get(partitionId);
+      } catch (IndexOutOfBoundsException ex) {
+        if (dataPusher.terminatedOrHasException()) {
+          return null;
+        }
+        throw ex;
+      }
+      if (Objects.isNull(pushTasks) && dataPusher.terminatedOrHasException()) {
+        return null;
+      }
+      if (!pushTasks.isEmpty()) {
+        Map<Integer, PartitionLocation> partitionLocationMap =
+            client.getOrRegisterShuffle(appId, shuffleId, numMappers, numPartitions);
+        PartitionLocation loc = partitionLocationMap.get(partitionId);
+        boolean reachLimit = pushState.limitMaxInFlight(loc.hostAndPushPort(), maxInFlight);

Review Comment:
   We can add a method in PushState such as
   ```
   boolean reachLimit(address)
   ```
   which indicates whether the location reaches limit



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -235,4 +237,9 @@ public abstract void pushDataHandShake(
 
   public abstract PartitionLocation registerMapPartitionTask(
       String appId, int shuffleId, int numMappers, int mapId, int attemptId);
+
+  public abstract ConcurrentHashMap<Integer, PartitionLocation> getOrRegisterShuffle(

Review Comment:
   we should call this in addTask()



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();

Review Comment:
   We should decrease the default value since it is multiplied by number of partitions. I think we should have a global capacity and a local capacity.



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;

Review Comment:
   In fact, I think we can still use one LinkedBlockingQueue. Each time we want to push data, we traverse the linked queue from the beginning until we find a task that can be pushed, then remove it from the queue and push it. PushState should expose a method to indicate whether the partitionId can be pushed, like:
   ```
   boolean canPush(partitionId)
   ```



##########
client/src/main/java/org/apache/celeborn/client/write/InFlightRequestTracker.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.protocol.message.StatusCode;
+
+/*
+ * This class is for track in flight request and limit request.
+ * */
+public class InFlightRequestTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightRequestTracker.class);
+
+  private final long timeoutMs;
+  private final long pushTimeout;
+  private final long delta;
+  private final PushState pushState;
+
+  private final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, BatchInfo>>
+      batchIdPerAddressPair = new ConcurrentHashMap<>();
+
+  public InFlightRequestTracker(CelebornConf conf, PushState pushState) {
+    this.timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    this.pushTimeout = conf.pushDataTimeoutMs();
+    this.delta = conf.pushLimitInFlightSleepDeltaMs();
+    this.pushState = pushState;
+  }
+
+  public void addFlightBatches(int batchId, String hostAndPushPort) {
+    ConcurrentHashMap<Integer, BatchInfo> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(hostAndPushPort, id -> new ConcurrentHashMap<>());
+    batchIdSetPerPair.computeIfAbsent(batchId, id -> new BatchInfo());
+  }
+
+  public void removeFlightBatches(int batchId, String hostAndPushPort) {

Review Comment:
   removeFlightBatch



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;
+  private final PushState pushState;
+  private final DataPusher dataPusher;
+  private final int maxInFlight;
+
+  private final String appId;
+  private final int shuffleId;
+  private final int numMappers;
+  private final int numPartitions;
+  private final ShuffleClient client;
+  private int partitionIdIdx = 0;
+
+  public DataPushQueue(
+      CelebornConf conf,
+      DataPusher dataPusher,
+      ShuffleClient client,
+      String appId,
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int numMappers,
+      int numPartitions) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.numMappers = numMappers;
+    this.numPartitions = numPartitions;
+    this.client = client;
+    this.dataPusher = dataPusher;
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    this.pushState = client.getOrRegisterPushState(mapKey);
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    final int capacity = conf.pushQueueCapacity();
+    workingQueuePerPartition = new ArrayList<>(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      workingQueuePerPartition.add(new LinkedBlockingQueue<>(capacity));
+    }
+  }
+
+  public LinkedBlockingQueue<PushTask> takeAnyWorkingQueue() throws IOException {
+    while (!dataPusher.terminatedOrHasException()) {
+      int partitionId = nextPartitionId();
+      LinkedBlockingQueue<PushTask> pushTasks;
+      try {
+        pushTasks = workingQueuePerPartition.get(partitionId);
+      } catch (IndexOutOfBoundsException ex) {
+        if (dataPusher.terminatedOrHasException()) {
+          return null;
+        }
+        throw ex;
+      }
+      if (Objects.isNull(pushTasks) && dataPusher.terminatedOrHasException()) {
+        return null;
+      }
+      if (!pushTasks.isEmpty()) {
+        Map<Integer, PartitionLocation> partitionLocationMap =
+            client.getOrRegisterShuffle(appId, shuffleId, numMappers, numPartitions);
+        PartitionLocation loc = partitionLocationMap.get(partitionId);
+        boolean reachLimit = pushState.limitMaxInFlight(loc.hostAndPushPort(), maxInFlight);

Review Comment:
   limitMaxInFlight will wait until the max in flight count is below the threshold, which is not our purpose. our purpose is fast-check and skip if the current partition reaches the threshold.



##########
client/src/main/java/org/apache/celeborn/client/write/DataPusher.java:
##########
@@ -158,7 +166,7 @@ public void waitOnTermination() throws IOException {
 
     terminated = true;
     idleQueue.clear();
-    workingQueue.clear();
+    dataPushQueue.clear();

Review Comment:
   IMO we should wait the DataPusher thread to join before clear the dataPushQueue to avoid potential concurrent issues. The code may look like this:
   ```
   Thread runner = new Thread("DataPusher-" + taskId) {...};
   runner.start();
   
   
    public void waitOnTermination() throws IOException {
        ....
        terminated = true;
        runner.join();
   
        idleQueue.clear();
        dataPushQueue.clear()
    }
   ```



##########
client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.Utils;
+
+/*
+ * Queue for push data,
+ * it can take one special PushTask queue by partitionId,
+ * and it can take one random PushTask queue.
+ * workingQueuePerPartition: for PushTask queue per every partition.
+ *
+ * */
+public class DataPushQueue {
+  private static final Logger logger = LoggerFactory.getLogger(DataPushQueue.class);
+
+  private final List<LinkedBlockingQueue<PushTask>> workingQueuePerPartition;

Review Comment:
   I think we should maintain a queue per address instead of per partition, since there can be tens of thousands of partitions, which will be too much overhead.



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -235,4 +237,9 @@ public abstract void pushDataHandShake(
 
   public abstract PartitionLocation registerMapPartitionTask(
       String appId, int shuffleId, int numMappers, int mapId, int attemptId);
+
+  public abstract ConcurrentHashMap<Integer, PartitionLocation> getOrRegisterShuffle(
+      String applicationId, int shuffleId, int numMappers, int numPartitions);
+
+  public abstract PushState getOrRegisterPushState(String mapKey);

Review Comment:
   better to rename to getPushState



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1065419013


##########
client/src/main/java/org/apache/celeborn/client/write/InFlightTracker.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.write;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class InFlightTracker {
+  private static final Logger logger = LoggerFactory.getLogger(InFlightTracker.class);
+
+  private final long timeoutMs;
+  private final long delta;
+  private final int maxInFlight;
+
+  private final PushState pushState;
+
+  public final AtomicInteger batchId = new AtomicInteger();
+  private final ConcurrentHashMap<String, Set<Integer>> batchIdPerAddressPair =
+      new ConcurrentHashMap<>();
+
+  public InFlightTracker(CelebornConf conf, PushState pushState) {
+    this.timeoutMs = conf.pushLimitInFlightTimeoutMs();
+    this.delta = conf.pushLimitInFlightSleepDeltaMs();
+    this.maxInFlight = conf.pushMaxReqsInFlight();
+    this.pushState = pushState;
+  }
+
+  public void addFlightBatches(int batchId, String hostAndPushPort) {
+    Set<Integer> batchIdSetPerPair =
+        batchIdPerAddressPair.computeIfAbsent(hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+    batchIdSetPerPair.add(batchId);
+  }
+
+  public void removeFlightBatches(int batchId, String hostAndPushPort) {
+    Set<Integer> batchIdSetPerPair = batchIdPerAddressPair.get(hostAndPushPort);
+    batchIdSetPerPair.remove(batchId);
+    if (batchIdSetPerPair.size() == 0) {
+      batchIdPerAddressPair.remove(hostAndPushPort);
+    }
+  }
+
+  public ConcurrentHashMap<String, Set<Integer>> getBatchIdPerAddressPair() {
+    return batchIdPerAddressPair;
+  }
+
+  public Set<Integer> getBatchIdSetByAddressPair(String addressPair) {
+    return batchIdPerAddressPair.computeIfAbsent(
+        addressPair, pair -> ConcurrentHashMap.newKeySet());
+  }
+
+  public boolean limitMaxInFlight(String hostAndPushPort) throws IOException {
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+    long times = timeoutMs / delta;
+    try {
+      while (times > 0) {
+        if (batchIdSet.size() <= maxInFlight) {
+          break;
+        }
+        if (pushState.exception.get() != null) {
+          throw pushState.exception.get();
+        }
+        Thread.sleep(delta);
+        times--;
+      }
+    } catch (InterruptedException e) {
+      pushState.exception.set(new IOException(e));
+    }
+
+    if (times <= 0) {
+      logger.warn(
+          "After waiting for {} ms, "
+              + "there are still {} batches in flight "
+              + "for hostAndPushPort {}, "
+              + "which exceeds the limit {}.",
+          timeoutMs,
+          batchIdSet.size(),
+          hostAndPushPort,
+          maxInFlight);
+    }
+
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    return times <= 0;
+  }
+
+  public boolean limitZeroInFlight() throws IOException {
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+    long times = timeoutMs / delta;
+
+    try {
+      while (times > 0) {
+        if (batchIdPerAddressPair.size() == 0) {
+          break;
+        }
+        if (pushState.exception.get() != null) {
+          throw pushState.exception.get();
+        }
+        Thread.sleep(delta);
+        times--;
+      }
+    } catch (InterruptedException e) {
+      pushState.exception.set(new IOException(e));
+    }
+
+    if (times <= 0) {
+      logger.error(
+          "After waiting for {} ms, there are still {} batches in flight, expect 0 batches",
+          timeoutMs,
+          batchIdPerAddressPair.values().stream().map(Set::size).reduce(Integer::sum));
+    }
+
+    if (pushState.exception.get() != null) {
+      throw pushState.exception.get();
+    }
+
+    return times <= 0;
+  }
+
+  protected int nextBatchId() {
+    return batchId.getAndIncrement();

Review Comment:
   I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1073158162


##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala:
##########
@@ -38,6 +38,7 @@ class HugeDataTest extends AnyFunSuite
 
   test("celeborn spark integration test - huge data") {
     val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
+      .set("spark.celeborn.shuffle.register.maxRetries", "10")

Review Comment:
   I can't reproduce, we should better to revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
waitinfuture commented on PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#issuecomment-1386457404

   In addition, we'd better add UT for DataPushQueue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zy-jordan commented on a diff in pull request #1102: [CELEBORN-55][FEATURE] Split maxReqsInFlight limitation into level of target worker

Posted by GitBox <gi...@apache.org>.
zy-jordan commented on code in PR #1102:
URL: https://github.com/apache/incubator-celeborn/pull/1102#discussion_r1071714591


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:
##########
@@ -235,4 +237,9 @@ public abstract void pushDataHandShake(
 
   public abstract PartitionLocation registerMapPartitionTask(
       String appId, int shuffleId, int numMappers, int mapId, int attemptId);
+
+  public abstract ConcurrentHashMap<Integer, PartitionLocation> getOrRegisterShuffle(

Review Comment:
   Now, PushTask working queue is LinkedBlockingQueue, don't need worker info while addTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org