You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/06 10:32:21 UTC
[incubator-celeborn] branch main updated: [CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2ec06472 [CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)
2ec06472 is described below
commit 2ec06472fe3da244bd81d6d950d201fd3ad515e0
Author: Shuang <lv...@gmail.com>
AuthorDate: Fri Jan 6 18:32:17 2023 +0800
[CELEBORN-203] fix NPE when removeExpiredShuffle in LifecycleManager. (#1151)
---
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 2 +-
.../scala/org/apache/celeborn/client/ShuffleTaskInfo.scala | 12 +++++++-----
.../org/apache/celeborn/client/ShuffleTaskInfoSuite.scala | 8 +++++++-
3 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c22d3cb6..073b15bb 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1053,7 +1053,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
- shuffleTaskInfo.remove(shuffleId)
+ shuffleTaskInfo.removeExpiredShuffle(shuffleId)
requestUnregisterShuffle(
rssHARetryClient,
UnregisterShuffle(appId, shuffleId, RssHARetryClient.genRequestId()))
diff --git a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
index 1742ab86..f49d5740 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleTaskInfo.scala
@@ -86,10 +86,12 @@ class ShuffleTaskInfo {
attemptIdMap.get(mapAttemptId)
}
- def remove(shuffleId: Int): Unit = {
- val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
- taskShuffleIdToShuffleId.remove(taskShuffleId)
- taskShuffleAttemptIdIndex.remove(shuffleId)
- taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+ def removeExpiredShuffle(shuffleId: Int): Unit = {
+ if (shuffleIdToTaskShuffleId.containsKey(shuffleId)) {
+ val taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId)
+ taskShuffleIdToShuffleId.remove(taskShuffleId)
+ taskShuffleAttemptIdIndex.remove(shuffleId)
+ taskShuffleAttemptIdToAttemptId.remove(taskShuffleId)
+ }
}
}
diff --git a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
index abe2e5f5..5196c56a 100644
--- a/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
+++ b/client/src/test/scala/org/apache/celeborn/client/ShuffleTaskInfoSuite.scala
@@ -43,8 +43,14 @@ class ShuffleTaskInfoSuite extends AnyFunSuite {
assert(encodeAttemptId012 == 1)
// remove shuffleId and reEncode
- shuffleTaskInfo.remove(encodeShuffleId)
+ shuffleTaskInfo.removeExpiredShuffle(encodeShuffleId)
val encodeShuffleIdNew = shuffleTaskInfo.getShuffleId("shuffleId")
assert(encodeShuffleIdNew == 2)
}
+
+ test("remove none exist shuffle") {
+ val shuffleTaskInfo = new ShuffleTaskInfo
+ // remove none exist shuffle
+ shuffleTaskInfo.removeExpiredShuffle(0)
+ }
}