You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "RexXiong (via GitHub)" <gi...@apache.org> on 2023/02/15 04:25:18 UTC

[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

RexXiong commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1106632990


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -210,38 +159,31 @@ class PartitionLocationInfo extends Logging {
   private def removePartitions(
       shuffleKey: String,
       uniqueIds: util.Collection[String],
-      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = this.synchronized {
-    if (!partitionInfo.containsKey(shuffleKey)) {
+      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = {
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap == null) {
       return (Map.empty[String, Integer].asJava, 0)
     }
     val locMap = new util.HashMap[String, Integer]()
     var numSlotsReleased: Int = 0
-    val reduceLocMap = partitionInfo.get(shuffleKey)
     uniqueIds.asScala.foreach { id =>
       val tokens = id.split("-", 2)
       val partitionId = tokens(0).toInt
       val epoch = tokens(1).toInt
-      val locations = reduceLocMap.get(partitionId)
-      if (locations != null) {
-        val targetLocation = locations.asScala.find(_.getEpoch == epoch).orNull
-        if (targetLocation != null) {
-          locations.remove(targetLocation)
-          numSlotsReleased += 1
-          locMap.compute(
-            targetLocation.getStorageInfo.getMountPoint,
-            new BiFunction[String, Integer, Integer] {
-              override def apply(t: String, u: Integer): Integer = {
-                if (u == null) 1 else u + 1
-              }
-            })
-        }
-      }
-      if (locations == null || locations.size() == 0) {
-        reduceLocMap.remove(partitionId)
+      val loc = partitionMap.remove(encode(partitionId, epoch))
+      if (loc != null) {
+        numSlotsReleased += 1
+        locMap.compute(
+          loc.getStorageInfo.getMountPoint,
+          new BiFunction[String, Integer, Integer] {
+            override def apply(t: String, u: Integer): Integer = {
+              if (u == null) 1 else u + 1
+            }
+          })
       }
     }
 
-    if (reduceLocMap.size() == 0) {
+    if (partitionMap.size() == 0) {

Review Comment:
   when do preCommit for hard-split partitions while some one revive/split a new partition there were probaly a concurrent problem.



##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -252,42 +194,29 @@ class PartitionLocationInfo extends Logging {
       shuffleKey: String,
       uniqueId: String,
       mode: PartitionLocation.Mode): PartitionLocation = {
-    val tokens = uniqueId.split("-", 2)
-    val partitionId = tokens(0).toInt
-    val epoch = tokens(1).toInt
     val partitionInfo =
       if (mode == PartitionLocation.Mode.MASTER) {
         masterPartitionLocations
       } else {
         slavePartitionLocations
       }
 
-    this.synchronized {
-      if (!partitionInfo.containsKey(shuffleKey)
-        || !partitionInfo.get(shuffleKey).containsKey(partitionId)) {
-        return null
-      }
-      partitionInfo.get(shuffleKey)
-        .get(partitionId)
-        .asScala
-        .find(loc => loc.getEpoch == epoch)
-        .orNull
-    }
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      val tokens = uniqueId.split("-", 2)

Review Comment:
   we can extract method encodeFromUniqueId to reduce  same code like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org