You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "waitinfuture (via GitHub)" <gi...@apache.org> on 2023/02/14 14:26:44 UTC

[GitHub] [incubator-celeborn] waitinfuture opened a new pull request, #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

waitinfuture opened a new pull request, #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   Refactor PartitionLocationInfo, replace HashMap with ConcurrentHashMap and removed synchronized blocks. Also change the core data structure from ```(shuffleKey  -> (partitionId -> List[PartitionLocation]))``` to ```(shuffleKey -> (encodedUniqueId -> partitionLocation))```, where ```encodedUniqueId``` is ```partitionId.toLong << 32 | epoch```
   
   
   ### Why are the changes needed?
   For very large test case, say 300T terasort with 30w * 15w parallelism with sort pusher, the synchronization lock becomes the bottleneck. Tried with ReentrantReadWriteLock, task duration decreases from 7min to 5.6min. The PR further uses ConcurrentHashMap to avoid lock.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Integration test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture merged pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture merged PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#issuecomment-1445134185

   I have run regression and results are good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#issuecomment-1429834221

   cc @FMX @RexXiong please take a review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1106632990


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -210,38 +159,31 @@ class PartitionLocationInfo extends Logging {
   private def removePartitions(
       shuffleKey: String,
       uniqueIds: util.Collection[String],
-      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = this.synchronized {
-    if (!partitionInfo.containsKey(shuffleKey)) {
+      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = {
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap == null) {
       return (Map.empty[String, Integer].asJava, 0)
     }
     val locMap = new util.HashMap[String, Integer]()
     var numSlotsReleased: Int = 0
-    val reduceLocMap = partitionInfo.get(shuffleKey)
     uniqueIds.asScala.foreach { id =>
       val tokens = id.split("-", 2)
       val partitionId = tokens(0).toInt
       val epoch = tokens(1).toInt
-      val locations = reduceLocMap.get(partitionId)
-      if (locations != null) {
-        val targetLocation = locations.asScala.find(_.getEpoch == epoch).orNull
-        if (targetLocation != null) {
-          locations.remove(targetLocation)
-          numSlotsReleased += 1
-          locMap.compute(
-            targetLocation.getStorageInfo.getMountPoint,
-            new BiFunction[String, Integer, Integer] {
-              override def apply(t: String, u: Integer): Integer = {
-                if (u == null) 1 else u + 1
-              }
-            })
-        }
-      }
-      if (locations == null || locations.size() == 0) {
-        reduceLocMap.remove(partitionId)
+      val loc = partitionMap.remove(encode(partitionId, epoch))
+      if (loc != null) {
+        numSlotsReleased += 1
+        locMap.compute(
+          loc.getStorageInfo.getMountPoint,
+          new BiFunction[String, Integer, Integer] {
+            override def apply(t: String, u: Integer): Integer = {
+              if (u == null) 1 else u + 1
+            }
+          })
       }
     }
 
-    if (reduceLocMap.size() == 0) {
+    if (partitionMap.size() == 0) {

Review Comment:
   when do preCommit for hard-split partitions while some one revive/split a new partition there were probaly a concurrent problem.



##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -252,42 +194,29 @@ class PartitionLocationInfo extends Logging {
       shuffleKey: String,
       uniqueId: String,
       mode: PartitionLocation.Mode): PartitionLocation = {
-    val tokens = uniqueId.split("-", 2)
-    val partitionId = tokens(0).toInt
-    val epoch = tokens(1).toInt
     val partitionInfo =
       if (mode == PartitionLocation.Mode.MASTER) {
         masterPartitionLocations
       } else {
         slavePartitionLocations
       }
 
-    this.synchronized {
-      if (!partitionInfo.containsKey(shuffleKey)
-        || !partitionInfo.get(shuffleKey).containsKey(partitionId)) {
-        return null
-      }
-      partitionInfo.get(shuffleKey)
-        .get(partitionId)
-        .asScala
-        .find(loc => loc.getEpoch == epoch)
-        .orNull
-    }
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      val tokens = uniqueId.split("-", 2)

Review Comment:
   we can extract method encodeFromUniqueId to reduce  same code like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1107094930


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -210,38 +159,31 @@ class PartitionLocationInfo extends Logging {
   private def removePartitions(
       shuffleKey: String,
       uniqueIds: util.Collection[String],
-      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = this.synchronized {
-    if (!partitionInfo.containsKey(shuffleKey)) {
+      partitionInfo: PartitionInfo): (util.Map[String, Integer], Integer) = {
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap == null) {
       return (Map.empty[String, Integer].asJava, 0)
     }
     val locMap = new util.HashMap[String, Integer]()
     var numSlotsReleased: Int = 0
-    val reduceLocMap = partitionInfo.get(shuffleKey)
     uniqueIds.asScala.foreach { id =>
       val tokens = id.split("-", 2)
       val partitionId = tokens(0).toInt
       val epoch = tokens(1).toInt
-      val locations = reduceLocMap.get(partitionId)
-      if (locations != null) {
-        val targetLocation = locations.asScala.find(_.getEpoch == epoch).orNull
-        if (targetLocation != null) {
-          locations.remove(targetLocation)
-          numSlotsReleased += 1
-          locMap.compute(
-            targetLocation.getStorageInfo.getMountPoint,
-            new BiFunction[String, Integer, Integer] {
-              override def apply(t: String, u: Integer): Integer = {
-                if (u == null) 1 else u + 1
-              }
-            })
-        }
-      }
-      if (locations == null || locations.size() == 0) {
-        reduceLocMap.remove(partitionId)
+      val loc = partitionMap.remove(encode(partitionId, epoch))
+      if (loc != null) {
+        numSlotsReleased += 1
+        locMap.compute(
+          loc.getStorageInfo.getMountPoint,
+          new BiFunction[String, Integer, Integer] {
+            override def apply(t: String, u: Integer): Integer = {
+              if (u == null) 1 else u + 1
+            }
+          })
       }
     }
 
-    if (reduceLocMap.size() == 0) {
+    if (partitionMap.size() == 0) {

Review Comment:
   Good catch! I added a cleanShuffle in PartitionLocationInfo which is called in Worker's cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1107097917


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -252,42 +194,29 @@ class PartitionLocationInfo extends Logging {
       shuffleKey: String,
       uniqueId: String,
       mode: PartitionLocation.Mode): PartitionLocation = {
-    val tokens = uniqueId.split("-", 2)
-    val partitionId = tokens(0).toInt
-    val epoch = tokens(1).toInt
     val partitionInfo =
       if (mode == PartitionLocation.Mode.MASTER) {
         masterPartitionLocations
       } else {
         slavePartitionLocations
       }
 
-    this.synchronized {
-      if (!partitionInfo.containsKey(shuffleKey)
-        || !partitionInfo.get(shuffleKey).containsKey(partitionId)) {
-        return null
-      }
-      partitionInfo.get(shuffleKey)
-        .get(partitionId)
-        .asScala
-        .find(loc => loc.getEpoch == epoch)
-        .orNull
-    }
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      val tokens = uniqueId.split("-", 2)

Review Comment:
   done



##########
common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java:
##########
@@ -224,6 +224,10 @@ public String getUniqueId() {
     return id + "-" + epoch;
   }
 
+  public long encodeUniqueId() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1107077168


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -252,42 +194,29 @@ class PartitionLocationInfo extends Logging {
       shuffleKey: String,
       uniqueId: String,
       mode: PartitionLocation.Mode): PartitionLocation = {
-    val tokens = uniqueId.split("-", 2)
-    val partitionId = tokens(0).toInt
-    val epoch = tokens(1).toInt
     val partitionInfo =
       if (mode == PartitionLocation.Mode.MASTER) {
         masterPartitionLocations
       } else {
         slavePartitionLocations
       }
 
-    this.synchronized {
-      if (!partitionInfo.containsKey(shuffleKey)
-        || !partitionInfo.get(shuffleKey).containsKey(partitionId)) {
-        return null
-      }
-      partitionInfo.get(shuffleKey)
-        .get(partitionId)
-        .asScala
-        .find(loc => loc.getEpoch == epoch)
-        .orNull
-    }
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      val tokens = uniqueId.split("-", 2)
+      val partitionId = tokens(0).toInt
+      val epoch = tokens(1).toInt
+      partitionMap.get(encode(partitionId, epoch))
+    } else null
   }
 
   private def getAllIds(
       shuffleKey: String,
-      partitionInfo: PartitionInfo): util.List[String] = this.synchronized {
-    if (!partitionInfo.containsKey(shuffleKey)) {
-      return null
-    }
-    partitionInfo.get(shuffleKey)
-      .values()
-      .asScala
-      .flatMap(_.asScala)
-      .map(_.getUniqueId)
-      .toList
-      .asJava
+      partitionInfo: PartitionInfo): util.List[String] = {
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      partitionMap.values().asScala.map(_.getUniqueId).toList.asJava
+    } else null

Review Comment:
   I think it's safe here since partitionMap is ConcurrentHashMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1107075431


##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -122,81 +110,42 @@ class PartitionLocationInfo extends Logging {
     removePartitions(shuffleKey, uniqueIds, slavePartitionLocations)
   }
 
-  def getAllMasterLocationsWithMinEpoch(shuffleKey: String): util.List[PartitionLocation] =
-    this.synchronized {
-      getAllMasterLocationsWithExtremeEpoch(shuffleKey, (a, b) => a < b)
-    }
+  def getAllMasterLocationsWithMinEpoch(shuffleKey: String): util.List[PartitionLocation] = {
+    getAllMasterLocationsWithExtremeEpoch(shuffleKey, (a, b) => a < b)
+  }
 
+  // TODO: UT
   def getAllMasterLocationsWithExtremeEpoch(
       shuffleKey: String,
-      order: (Int, Int) => Boolean): util.List[PartitionLocation] = this.synchronized {
-    if (masterPartitionLocations.containsKey(shuffleKey)) {
-      masterPartitionLocations.get(shuffleKey)
-        .values()
-        .asScala
-        .map { list =>
-          var loc = list.get(0)
-          1 until list.size() foreach (ind => {
-            if (order(list.get(ind).getEpoch, loc.getEpoch)) {
-              loc = list.get(ind)
-            }
-          })
-          loc
-        }.toList.asJava
+      order: (Int, Int) => Boolean): util.List[PartitionLocation] = {
+    val partitionMap = masterPartitionLocations.get(shuffleKey)
+    if (partitionMap != null) {
+      val resMap = new util.HashMap[Int, PartitionLocation]()
+      partitionMap.values().asScala.foreach(loc => {

Review Comment:
   Since partitionMap is ConcurrentHashMap, I think it's safe here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1236: [CELEBORN-301] Refactor PartitionLocationInfo to use ConcurrentHashMap

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1236:
URL: https://github.com/apache/incubator-celeborn/pull/1236#discussion_r1106550078


##########
common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java:
##########
@@ -224,6 +224,10 @@ public String getUniqueId() {
     return id + "-" + epoch;
   }
 
+  public long encodeUniqueId() {

Review Comment:
   This method is not used. It can be removed.



##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -122,81 +110,42 @@ class PartitionLocationInfo extends Logging {
     removePartitions(shuffleKey, uniqueIds, slavePartitionLocations)
   }
 
-  def getAllMasterLocationsWithMinEpoch(shuffleKey: String): util.List[PartitionLocation] =
-    this.synchronized {
-      getAllMasterLocationsWithExtremeEpoch(shuffleKey, (a, b) => a < b)
-    }
+  def getAllMasterLocationsWithMinEpoch(shuffleKey: String): util.List[PartitionLocation] = {
+    getAllMasterLocationsWithExtremeEpoch(shuffleKey, (a, b) => a < b)
+  }
 
+  // TODO: UT
   def getAllMasterLocationsWithExtremeEpoch(
       shuffleKey: String,
-      order: (Int, Int) => Boolean): util.List[PartitionLocation] = this.synchronized {
-    if (masterPartitionLocations.containsKey(shuffleKey)) {
-      masterPartitionLocations.get(shuffleKey)
-        .values()
-        .asScala
-        .map { list =>
-          var loc = list.get(0)
-          1 until list.size() foreach (ind => {
-            if (order(list.get(ind).getEpoch, loc.getEpoch)) {
-              loc = list.get(ind)
-            }
-          })
-          loc
-        }.toList.asJava
+      order: (Int, Int) => Boolean): util.List[PartitionLocation] = {
+    val partitionMap = masterPartitionLocations.get(shuffleKey)
+    if (partitionMap != null) {
+      val resMap = new util.HashMap[Int, PartitionLocation]()
+      partitionMap.values().asScala.foreach(loc => {

Review Comment:
   There might be some concurrent issues. Another thread might change scala-wrapped iterator.



##########
common/src/main/scala/org/apache/celeborn/common/meta/PartitionLocationInfo.scala:
##########
@@ -252,42 +194,29 @@ class PartitionLocationInfo extends Logging {
       shuffleKey: String,
       uniqueId: String,
       mode: PartitionLocation.Mode): PartitionLocation = {
-    val tokens = uniqueId.split("-", 2)
-    val partitionId = tokens(0).toInt
-    val epoch = tokens(1).toInt
     val partitionInfo =
       if (mode == PartitionLocation.Mode.MASTER) {
         masterPartitionLocations
       } else {
         slavePartitionLocations
       }
 
-    this.synchronized {
-      if (!partitionInfo.containsKey(shuffleKey)
-        || !partitionInfo.get(shuffleKey).containsKey(partitionId)) {
-        return null
-      }
-      partitionInfo.get(shuffleKey)
-        .get(partitionId)
-        .asScala
-        .find(loc => loc.getEpoch == epoch)
-        .orNull
-    }
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      val tokens = uniqueId.split("-", 2)
+      val partitionId = tokens(0).toInt
+      val epoch = tokens(1).toInt
+      partitionMap.get(encode(partitionId, epoch))
+    } else null
   }
 
   private def getAllIds(
       shuffleKey: String,
-      partitionInfo: PartitionInfo): util.List[String] = this.synchronized {
-    if (!partitionInfo.containsKey(shuffleKey)) {
-      return null
-    }
-    partitionInfo.get(shuffleKey)
-      .values()
-      .asScala
-      .flatMap(_.asScala)
-      .map(_.getUniqueId)
-      .toList
-      .asJava
+      partitionInfo: PartitionInfo): util.List[String] = {
+    val partitionMap = partitionInfo.get(shuffleKey)
+    if (partitionMap != null) {
+      partitionMap.values().asScala.map(_.getUniqueId).toList.asJava
+    } else null

Review Comment:
   It will cause NPE while removePartitionLocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org