You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/06 10:32:21 UTC

[incubator-celeborn] branch main updated: [CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ec06472 [CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)
2ec06472 is described below

commit 2ec06472fe3da244bd81d6d950d201fd3ad515e0
Author: Shuang <lv...@gmail.com>
AuthorDate: Fri Jan 6 18:32:17 2023 +0800

    [CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)
---
 .../scala/org/apache/celeborn/client/LifecycleManager.scala  |  2 +-
 .../scala/org/apache/celeborn/client/ShuffleTaskInfo.scala   | 12 +++++++-----
 .../org/apache/celeborn/client/ShuffleTaskInfoSuite.scala    |  8 +++++++-
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c22d3cb6..073b15bb 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1053,7 +1053,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
         latestPartitionLocation.remove(shuffleId)
         commitManager.removeExpiredShuffle(shuffleId)
         changePartitionManager.removeExpiredShuffle(shuffleId)
-        shuffleTaskInfo.remove(shuffleId)
+        shuffleTaskInfo.removeExpiredShuffle(shuffleId)
         requestUnregisterShuffle(
           rssHARetryClient,
           UnregisterShuffle(appId, shuffleId, RssHARetryClient.genRequestId()))
diff --git a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
index 1742ab86..f49d5740 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
@@ -86,10 +86,12 @@ class ShuffleTaskInfo {
     attemptIdMap.get(mapAttemptId)
   }
 
-  def remove(shuffleId: Int): Unit = {
-    val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
-    taskShuffleIdToShuffleId.remove(taskShuffleId)
-    taskShuffleAttemptIdIndex.remove(shuffleId)
-    taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+  def removeExpiredShuffle(shuffleId: Int): Unit = {
+    if (shuffleIdToTaskShuffleId.containsKey(shuffleId)) {
+      val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
+      taskShuffleIdToShuffleId.remove(taskShuffleId)
+      taskShuffleAttemptIdIndex.remove(shuffleId)
+      taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+    }
   }
 }
diff --git a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
index abe2e5f5..5196c56a 100644
--- a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
+++ b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
@@ -43,8 +43,14 @@ class ShuffleTaskInfoSuite extends AnyFunSuite {
     assert(encodeAttemptId012 == 1)
 
     // remove shuffleId and reEncode
-    shuffleTaskInfo.remove(encodeShuffleId)
+    shuffleTaskInfo.removeExpiredShuffle(encodeShuffleId)
     val encodeShuffleIdNew = shuffleTaskInfo.getShuffleId("shuffleId")
     assert(encodeShuffleIdNew == 2)
   }
+
+  test("remove none exist shuffle") {
+    val shuffleTaskInfo = new ShuffleTaskInfo
+    // remove none exist shuffle
+    shuffleTaskInfo.removeExpiredShuffle(0)
+  }
 }