You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "RexXiong (via GitHub)" <gi...@apache.org> on 2023/04/03 08:35:04 UTC

[GitHub] [incubator-celeborn] RexXiong opened a new pull request, #1405: [CELEBORN-479][FLINK] support stopTrackingAndReleasePartitions

RexXiong opened a new pull request, #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   1、support stopTrackingAndReleasePartitions when celeborn workers were not available.
   
   ### Why are the changes needed?
   When encounter celeborn worker shutdown, data would be lost, Flink tasks will be retry but still try to read the lost data, but the job would not success. So we need to let the Flink framework known which data of the partitions can't be reading anymore, then Flink will schedule these partitions to regenerate
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   TPCDS with random worker kill
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#issuecomment-1508201233

   @waitinfuture I merge ShuffleWorkerStatusListener with ShuffleResourceTracker, and change untrack partitions by shuffle to  job for we can notify all untracking partitions to flink framework at one time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture merged pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture merged PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#discussion_r1166468137


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleWorkerStatusListener.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.client.listener.WorkerStatusListener;
+import org.apache.celeborn.client.listener.WorkersStatus;
+import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.plugin.flink.ShuffleResourceTracker.ShuffleResourceListener;
+
+public class ShuffleWorkerStatusListener implements WorkerStatusListener {

Review Comment:
   Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#discussion_r1167554590


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.client.listener.WorkerStatusListener;
+import org.apache.celeborn.client.listener.WorkersStatus;
+import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
+
+public class ShuffleResourceTracker implements WorkerStatusListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleResourceTracker.class);
+  private final ExecutorService executorService;
+  private final LifecycleManager lifecycleManager;
+  // JobID -> ShuffleResourceListener
+  private final Map<JobID, JobShuffleResourceListener> shuffleResourceListeners =
+      new ConcurrentHashMap<>();
+  private static final int MAX_RETRY_TIMES = 3;
+
+  public ShuffleResourceTracker(
+      ExecutorService executorService, LifecycleManager lifecycleManager) {
+    this.executorService = executorService;
+    this.lifecycleManager = lifecycleManager;
+    lifecycleManager.registerWorkerStatusListener(this);
+  }
+
+  public void registerJob(JobShuffleContext jobShuffleContext) {
+    shuffleResourceListeners.put(
+        jobShuffleContext.getJobId(),
+        new JobShuffleResourceListener(jobShuffleContext, executorService));
+  }
+
+  public void addPartitionResource(
+      JobID jobId, int shuffleId, int partitionId, ResultPartitionID partitionID) {
+    JobShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(jobId);
+    shuffleResourceListener.addPartitionResource(shuffleId, partitionId, partitionID);
+  }
+
+  public void removePartitionResource(JobID jobID, int shuffleId, int partitionId) {
+    JobShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(jobID);
+    if (shuffleResourceListener != null) {
+      shuffleResourceListener.removePartitionResource(shuffleId, partitionId);
+    }
+  }
+
+  public JobShuffleResourceListener getJobResourceListener(JobID jobID) {
+    return shuffleResourceListeners.get(jobID);
+  }
+
+  public void unRegisterJob(JobID jobID) {
+    shuffleResourceListeners.remove(jobID);
+  }
+
+  @Override
+  public void notifyChangedWorkersStatus(WorkersStatus workersStatus) {
+    try {
+      List<WorkerInfo> unknownWorkers = workersStatus.unknownWorkers;
+      if (unknownWorkers != null && !unknownWorkers.isEmpty()) {
+        // untrack by job
+        for (Map.Entry<JobID, JobShuffleResourceListener> entry :
+            shuffleResourceListeners.entrySet()) {
+          Set<ResultPartitionID> partitionIds = new HashSet<>();
+          JobShuffleResourceListener shuffleResourceListener = entry.getValue();
+          for (Map.Entry<Integer, Map<Integer, ResultPartitionID>> mapEntry :
+              shuffleResourceListener.getResultPartitionMap().entrySet()) {
+            int shuffleId = mapEntry.getKey();
+            if (!mapEntry.getValue().isEmpty()) {
+              for (WorkerInfo unknownWorker : unknownWorkers) {
+                Map<WorkerInfo, ShufflePartitionLocationInfo> shuffleAllocateInfo =
+                    lifecycleManager.workerSnapshots(shuffleId);
+                // shuffleResourceListener may release when the shuffle is ended
+                if (shuffleAllocateInfo != null) {
+                  ShufflePartitionLocationInfo shufflePartitionLocationInfo =
+                      shuffleAllocateInfo.remove(unknownWorker);

Review Comment:
   > 
   
   In fact we can't remove here, since the later loop will find shufflePartitionLocationInfo is null, which is not correct. Maybe just use get here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#discussion_r1166227716


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+
+public class ShuffleResourceTracker {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleResourceTracker.class);
+  private final ExecutorService executorService;
+  private final Map<JobID, JobShuffleContext> jobShuffleContextMap = new ConcurrentHashMap<>();
+  // shuffleId -> ShuffleResourceListener
+  private final Map<Integer, ShuffleResourceListener> shuffleResourceListeners =
+      new ConcurrentHashMap<>();
+  private static final int MAX_RETRY_TIMES = 3;
+
+  public ShuffleResourceTracker(
+      ExecutorService executorService, LifecycleManager lifecycleManager) {
+    this.executorService = executorService;
+    ShuffleWorkerStatusListener shuffleWorkerStatusListener =
+        new ShuffleWorkerStatusListener(lifecycleManager, shuffleResourceListeners);
+    lifecycleManager.registerWorkerStatusListener(shuffleWorkerStatusListener);
+  }
+
+  public void registerJobContext(JobShuffleContext jobShuffleContext) {
+    jobShuffleContextMap.put(jobShuffleContext.getJobId(), jobShuffleContext);
+  }
+
+  public void addPartitionResource(
+      JobID jobId, int shuffleId, int partitionId, ResultPartitionID partitionID) {
+    JobShuffleContext jobShuffleContext = jobShuffleContextMap.get(jobId);
+    ShuffleResourceListener shuffleResourceListener =
+        shuffleResourceListeners.computeIfAbsent(
+            shuffleId,
+            (s) -> new ShuffleResourceListener(jobShuffleContext, shuffleId, executorService));
+    shuffleResourceListener.addPartitionResource(partitionId, partitionID);
+  }
+
+  public void removePartitionResource(int shuffleId, int partitionId) {
+    ShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(shuffleId);
+    if (shuffleResourceListener != null) {
+      shuffleResourceListener.removePartitionResource(partitionId);
+    }
+  }
+
+  public void removeShuffleListener(int shuffleId) {
+    shuffleResourceListeners.remove(shuffleId);
+  }
+
+  public void unRegisterJob(JobID jobID) {
+    jobShuffleContextMap.remove(jobID);
+  }
+
+  public static class ShuffleResourceListener {
+
+    private final JobShuffleContext context;
+    private final int shuffleId;
+    private final ExecutorService executorService;
+
+    // celeborn partitionId -> Flink ResultPartitionID
+    private Map<Integer, ResultPartitionID> resultPartitionMap = new ConcurrentHashMap<>();
+
+    public ShuffleResourceListener(
+        JobShuffleContext jobShuffleContext, int shuffleId, ExecutorService executorService) {
+      this.context = jobShuffleContext;
+      this.shuffleId = shuffleId;
+      this.executorService = executorService;
+    }
+
+    public void addPartitionResource(int partitionId, ResultPartitionID partitionID) {
+      resultPartitionMap.put(partitionId, partitionID);
+    }
+
+    public void notifyStopTrackingPartitions(Set<Integer> partitionIds) {
+      Set<ResultPartitionID> resultPartitionIDS = new HashSet<>();
+
+      for (Integer partitionId : partitionIds) {
+        ResultPartitionID resultPartitionID = resultPartitionMap.remove(partitionId);
+        if (resultPartitionID != null) {
+          resultPartitionIDS.add(resultPartitionID);
+        }
+      }
+
+      notifyStopTrackingPartitions(resultPartitionIDS, new AtomicInteger(MAX_RETRY_TIMES));

Review Comment:
   nit: IMO it's better to check null/empty here instead of in notifyStopTrackingPartitions to reduce function calls.



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+
+public class ShuffleResourceTracker {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleResourceTracker.class);
+  private final ExecutorService executorService;
+  private final Map<JobID, JobShuffleContext> jobShuffleContextMap = new ConcurrentHashMap<>();
+  // shuffleId -> ShuffleResourceListener
+  private final Map<Integer, ShuffleResourceListener> shuffleResourceListeners =
+      new ConcurrentHashMap<>();
+  private static final int MAX_RETRY_TIMES = 3;
+
+  public ShuffleResourceTracker(
+      ExecutorService executorService, LifecycleManager lifecycleManager) {
+    this.executorService = executorService;
+    ShuffleWorkerStatusListener shuffleWorkerStatusListener =
+        new ShuffleWorkerStatusListener(lifecycleManager, shuffleResourceListeners);
+    lifecycleManager.registerWorkerStatusListener(shuffleWorkerStatusListener);
+  }
+
+  public void registerJobContext(JobShuffleContext jobShuffleContext) {
+    jobShuffleContextMap.put(jobShuffleContext.getJobId(), jobShuffleContext);
+  }
+
+  public void addPartitionResource(
+      JobID jobId, int shuffleId, int partitionId, ResultPartitionID partitionID) {
+    JobShuffleContext jobShuffleContext = jobShuffleContextMap.get(jobId);
+    ShuffleResourceListener shuffleResourceListener =
+        shuffleResourceListeners.computeIfAbsent(
+            shuffleId,
+            (s) -> new ShuffleResourceListener(jobShuffleContext, shuffleId, executorService));
+    shuffleResourceListener.addPartitionResource(partitionId, partitionID);
+  }
+
+  public void removePartitionResource(int shuffleId, int partitionId) {
+    ShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(shuffleId);
+    if (shuffleResourceListener != null) {
+      shuffleResourceListener.removePartitionResource(partitionId);
+    }
+  }
+
+  public void removeShuffleListener(int shuffleId) {
+    shuffleResourceListeners.remove(shuffleId);
+  }
+
+  public void unRegisterJob(JobID jobID) {
+    jobShuffleContextMap.remove(jobID);
+  }
+
+  public static class ShuffleResourceListener {
+
+    private final JobShuffleContext context;
+    private final int shuffleId;
+    private final ExecutorService executorService;
+
+    // celeborn partitionId -> Flink ResultPartitionID
+    private Map<Integer, ResultPartitionID> resultPartitionMap = new ConcurrentHashMap<>();
+
+    public ShuffleResourceListener(
+        JobShuffleContext jobShuffleContext, int shuffleId, ExecutorService executorService) {
+      this.context = jobShuffleContext;
+      this.shuffleId = shuffleId;
+      this.executorService = executorService;
+    }
+
+    public void addPartitionResource(int partitionId, ResultPartitionID partitionID) {
+      resultPartitionMap.put(partitionId, partitionID);
+    }
+
+    public void notifyStopTrackingPartitions(Set<Integer> partitionIds) {
+      Set<ResultPartitionID> resultPartitionIDS = new HashSet<>();
+
+      for (Integer partitionId : partitionIds) {
+        ResultPartitionID resultPartitionID = resultPartitionMap.remove(partitionId);
+        if (resultPartitionID != null) {
+          resultPartitionIDS.add(resultPartitionID);
+        }
+      }
+
+      notifyStopTrackingPartitions(resultPartitionIDS, new AtomicInteger(MAX_RETRY_TIMES));
+    }
+
+    private void notifyStopTrackingPartitions(
+        Set<ResultPartitionID> partitionIDS, AtomicInteger remainingRetries) {
+      if (partitionIDS == null || partitionIDS.isEmpty()) {
+        return;
+      }
+
+      LOG.debug(
+          "shuffleId: {}, stop tracking partitions {}.",
+          shuffleId,
+          Arrays.toString(partitionIDS.toArray()));
+
+      int count = remainingRetries.decrementAndGet();
+      try {
+        CompletableFuture<?> future = context.stopTrackingAndReleasePartitions(partitionIDS);
+        future.whenCompleteAsync(
+            (ignored, throwable) -> {
+              if (throwable == null) {
+                return;
+              }
+
+              if (count == 0) {
+                LOG.error(
+                    "shuffleId: {}, Failed to stop tracking partitions {}.",
+                    shuffleId,
+                    Arrays.toString(partitionIDS.toArray()));
+                return;
+              }
+              notifyStopTrackingPartitions(partitionIDS, remainingRetries);
+            },
+            executorService);
+      } catch (Throwable throwable) {

Review Comment:
   Seems the try-catch is unnecessary



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+
+public class ShuffleResourceTracker {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleResourceTracker.class);
+  private final ExecutorService executorService;
+  private final Map<JobID, JobShuffleContext> jobShuffleContextMap = new ConcurrentHashMap<>();
+  // shuffleId -> ShuffleResourceListener
+  private final Map<Integer, ShuffleResourceListener> shuffleResourceListeners =
+      new ConcurrentHashMap<>();
+  private static final int MAX_RETRY_TIMES = 3;
+
+  public ShuffleResourceTracker(
+      ExecutorService executorService, LifecycleManager lifecycleManager) {
+    this.executorService = executorService;
+    ShuffleWorkerStatusListener shuffleWorkerStatusListener =
+        new ShuffleWorkerStatusListener(lifecycleManager, shuffleResourceListeners);
+    lifecycleManager.registerWorkerStatusListener(shuffleWorkerStatusListener);
+  }
+
+  public void registerJobContext(JobShuffleContext jobShuffleContext) {
+    jobShuffleContextMap.put(jobShuffleContext.getJobId(), jobShuffleContext);
+  }
+
+  public void addPartitionResource(
+      JobID jobId, int shuffleId, int partitionId, ResultPartitionID partitionID) {
+    JobShuffleContext jobShuffleContext = jobShuffleContextMap.get(jobId);
+    ShuffleResourceListener shuffleResourceListener =
+        shuffleResourceListeners.computeIfAbsent(
+            shuffleId,
+            (s) -> new ShuffleResourceListener(jobShuffleContext, shuffleId, executorService));
+    shuffleResourceListener.addPartitionResource(partitionId, partitionID);
+  }
+
+  public void removePartitionResource(int shuffleId, int partitionId) {
+    ShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(shuffleId);
+    if (shuffleResourceListener != null) {
+      shuffleResourceListener.removePartitionResource(partitionId);
+    }
+  }
+
+  public void removeShuffleListener(int shuffleId) {
+    shuffleResourceListeners.remove(shuffleId);
+  }
+
+  public void unRegisterJob(JobID jobID) {
+    jobShuffleContextMap.remove(jobID);
+  }
+
+  public static class ShuffleResourceListener {
+
+    private final JobShuffleContext context;
+    private final int shuffleId;
+    private final ExecutorService executorService;
+
+    // celeborn partitionId -> Flink ResultPartitionID
+    private Map<Integer, ResultPartitionID> resultPartitionMap = new ConcurrentHashMap<>();
+
+    public ShuffleResourceListener(
+        JobShuffleContext jobShuffleContext, int shuffleId, ExecutorService executorService) {
+      this.context = jobShuffleContext;
+      this.shuffleId = shuffleId;
+      this.executorService = executorService;
+    }
+
+    public void addPartitionResource(int partitionId, ResultPartitionID partitionID) {
+      resultPartitionMap.put(partitionId, partitionID);
+    }
+
+    public void notifyStopTrackingPartitions(Set<Integer> partitionIds) {
+      Set<ResultPartitionID> resultPartitionIDS = new HashSet<>();
+
+      for (Integer partitionId : partitionIds) {
+        ResultPartitionID resultPartitionID = resultPartitionMap.remove(partitionId);
+        if (resultPartitionID != null) {
+          resultPartitionIDS.add(resultPartitionID);
+        }
+      }
+
+      notifyStopTrackingPartitions(resultPartitionIDS, new AtomicInteger(MAX_RETRY_TIMES));
+    }
+
+    private void notifyStopTrackingPartitions(
+        Set<ResultPartitionID> partitionIDS, AtomicInteger remainingRetries) {

Review Comment:
   Seems it's unnecessary to use AtomicInteger because another try will only be triggered after the previous fails.



##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -31,14 +33,26 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 
 class WorkerStatusTracker(
     conf: CelebornConf,
-    lifecycleManager: LifecycleManager,
-    commitManager: CommitManager) extends Logging {
+    lifecycleManager: LifecycleManager) extends Logging {
   private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+  private val workerStatusListeners = ConcurrentHashMap.newKeySet[WorkerStatusListener]()
 
   // blacklist
   val blacklist = new ShuffleFailedWorkers()
   private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
 
+  def registerWorkerStatusListener(workerStatusListener: WorkerStatusListener): Unit = {
+    workerStatusListeners.add(workerStatusListener)
+  }
+
+  def getCandidateBlackListWorkers(): Set[WorkerInfo] = {
+    if (conf.workerExcludedUseAllocatedWorkers) {

Review Comment:
   I don't quite understand why return all allocated workers here as blacklist



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleWorkerStatusListener.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.client.listener.WorkerStatusListener;
+import org.apache.celeborn.client.listener.WorkersStatus;
+import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.plugin.flink.ShuffleResourceTracker.ShuffleResourceListener;
+
+public class ShuffleWorkerStatusListener implements WorkerStatusListener {

Review Comment:
   IMO we should merge ShuffleWorkerStatusListener with ShuffleResourceTracker and make ShuffleResourceTracker a WorkerStatusListener. ShuffleWorkerStatusListener only has a reference to ShuffleResourceTracker's shuffleResourceListeners.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#discussion_r1166476930


##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -31,14 +33,26 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 
 class WorkerStatusTracker(
     conf: CelebornConf,
-    lifecycleManager: LifecycleManager,
-    commitManager: CommitManager) extends Logging {
+    lifecycleManager: LifecycleManager) extends Logging {
   private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+  private val workerStatusListeners = ConcurrentHashMap.newKeySet[WorkerStatusListener]()
 
   // blacklist
   val blacklist = new ShuffleFailedWorkers()
   private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
 
+  def registerWorkerStatusListener(workerStatusListener: WorkerStatusListener): Unit = {
+    workerStatusListeners.add(workerStatusListener)
+  }
+
+  def getCandidateBlackListWorkers(): Set[WorkerInfo] = {
+    if (conf.workerExcludedUseAllocatedWorkers) {

Review Comment:
   blacklist in master means resource cannot be allocated on, blacklist in lifecycleManager means resource cannot read/write(But this may not complete, Since lifecycleManager do not know a partition data is whether available for reading after commit). As We need to know which worker is not for reading in time, the way for this is compare allocated workers to Master alive workers to get these.  @waitinfuture 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1405: [CELEBORN-479][FLINK] support stopTrackingAndReleasePartitions

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#issuecomment-1493928913

   ## [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1405](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9829ebb) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/bf46336d548fa26f38b4a52c79e1178b2c19f888?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bf46336) will **decrease** coverage by `0.14%`.
   > The diff coverage is `35.30%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##             main    #1405      +/-   ##
   ==========================================
   - Coverage   45.05%   44.90%   -0.14%     
   ==========================================
     Files         164      164              
     Lines       10404    10416      +12     
     Branches     1057     1058       +1     
   ==========================================
   - Hits         4686     4676      -10     
   - Misses       5378     5398      +20     
   - Partials      340      342       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...orn/common/meta/ShufflePartitionLocationInfo.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL21ldGEvU2h1ZmZsZVBhcnRpdGlvbkxvY2F0aW9uSW5mby5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...che/celeborn/common/metrics/source/RPCSource.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL21ldHJpY3Mvc291cmNlL1JQQ1NvdXJjZS5zY2FsYQ==) | `59.35% <0.00%> (ø)` | |
   | [...born/common/protocol/message/ControlMessages.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3Byb3RvY29sL21lc3NhZ2UvQ29udHJvbE1lc3NhZ2VzLnNjYWxh) | `1.57% <0.00%> (-<0.01%)` | :arrow_down: |
   | [...org/apache/celeborn/common/util/PbSerDeUtils.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3V0aWwvUGJTZXJEZVV0aWxzLnNjYWxh) | `63.21% <ø> (ø)` | |
   | [...cala/org/apache/celeborn/common/CelebornConf.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1405?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL0NlbGVib3JuQ29uZi5zY2FsYQ==) | `86.29% <85.72%> (-0.05%)` | :arrow_down: |
   
   ... and [3 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-celeborn/pull/1405/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1405: [CELEBORN-470][FLINK] support stopTrackingAndReleasePartitions

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1405:
URL: https://github.com/apache/incubator-celeborn/pull/1405#discussion_r1167545735


##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.client.listener.WorkerStatusListener;
+import org.apache.celeborn.client.listener.WorkersStatus;
+import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
+
+public class ShuffleResourceTracker implements WorkerStatusListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleResourceTracker.class);
+  private final ExecutorService executorService;
+  private final LifecycleManager lifecycleManager;
+  // JobID -> ShuffleResourceListener
+  private final Map<JobID, JobShuffleResourceListener> shuffleResourceListeners =
+      new ConcurrentHashMap<>();
+  private static final int MAX_RETRY_TIMES = 3;
+
+  public ShuffleResourceTracker(
+      ExecutorService executorService, LifecycleManager lifecycleManager) {
+    this.executorService = executorService;
+    this.lifecycleManager = lifecycleManager;
+    lifecycleManager.registerWorkerStatusListener(this);
+  }
+
+  public void registerJob(JobShuffleContext jobShuffleContext) {
+    shuffleResourceListeners.put(
+        jobShuffleContext.getJobId(),
+        new JobShuffleResourceListener(jobShuffleContext, executorService));
+  }
+
+  public void addPartitionResource(
+      JobID jobId, int shuffleId, int partitionId, ResultPartitionID partitionID) {
+    JobShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(jobId);
+    shuffleResourceListener.addPartitionResource(shuffleId, partitionId, partitionID);
+  }
+
+  public void removePartitionResource(JobID jobID, int shuffleId, int partitionId) {
+    JobShuffleResourceListener shuffleResourceListener = shuffleResourceListeners.get(jobID);
+    if (shuffleResourceListener != null) {
+      shuffleResourceListener.removePartitionResource(shuffleId, partitionId);
+    }
+  }
+
+  public JobShuffleResourceListener getJobResourceListener(JobID jobID) {
+    return shuffleResourceListeners.get(jobID);
+  }
+
+  public void unRegisterJob(JobID jobID) {
+    shuffleResourceListeners.remove(jobID);
+  }
+
+  @Override
+  public void notifyChangedWorkersStatus(WorkersStatus workersStatus) {
+    try {
+      List<WorkerInfo> unknownWorkers = workersStatus.unknownWorkers;
+      if (unknownWorkers != null && !unknownWorkers.isEmpty()) {
+        // untrack by job
+        for (Map.Entry<JobID, JobShuffleResourceListener> entry :
+            shuffleResourceListeners.entrySet()) {
+          Set<ResultPartitionID> partitionIds = new HashSet<>();
+          JobShuffleResourceListener shuffleResourceListener = entry.getValue();
+          for (Map.Entry<Integer, Map<Integer, ResultPartitionID>> mapEntry :
+              shuffleResourceListener.getResultPartitionMap().entrySet()) {
+            int shuffleId = mapEntry.getKey();
+            if (!mapEntry.getValue().isEmpty()) {
+              for (WorkerInfo unknownWorker : unknownWorkers) {
+                Map<WorkerInfo, ShufflePartitionLocationInfo> shuffleAllocateInfo =
+                    lifecycleManager.workerSnapshots(shuffleId);
+                // shuffleResourceListener may release when the shuffle is ended
+                if (shuffleAllocateInfo != null) {
+                  ShufflePartitionLocationInfo shufflePartitionLocationInfo =
+                      shuffleAllocateInfo.remove(unknownWorker);

Review Comment:
   Is it safe to remove from shuffleAllocateInfo? Seems OK for MapPartition, but we need to double check. At least for ReducePartition we should never remove from it, or there could be data lost.



##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -31,14 +33,26 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 
 class WorkerStatusTracker(
     conf: CelebornConf,
-    lifecycleManager: LifecycleManager,
-    commitManager: CommitManager) extends Logging {
+    lifecycleManager: LifecycleManager) extends Logging {
   private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+  private val workerStatusListeners = ConcurrentHashMap.newKeySet[WorkerStatusListener]()
 
   // blacklist
   val blacklist = new ShuffleFailedWorkers()
   private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
 
+  def registerWorkerStatusListener(workerStatusListener: WorkerStatusListener): Unit = {
+    workerStatusListeners.add(workerStatusListener)
+  }
+
+  def getCandidateBlackListWorkers(): Set[WorkerInfo] = {
+    if (conf.workerExcludedUseAllocatedWorkers) {

Review Comment:
   Have you encountered issues during test if not change this way? The semantics will be changed and the name ```localBlacklist``` will be misleading. I understand the benefit is that LifecycleManager can be notified of unknown workers more timely, but the message body will increase. I'm OK with the change but the name and config description should be refined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org