You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/29 23:03:18 UTC
spark git commit: [SPARK-21188][CORE] releaseAllLocksForTask should
synchronize the whole method
Repository: spark
Updated Branches:
refs/heads/master 18066f2e6 -> f9151bebc
[SPARK-21188][CORE] releaseAllLocksForTask should synchronize the whole method
## What changes were proposed in this pull request?
Since the objects `readLocksByTask`, `writeLocksByTask` and `info`s are coupled and supposed to be modified by other threads concurrently, all the read and writes of them in the method `releaseAllLocksForTask` should be protected by a single synchronized block like other similar methods.
## How was this patch tested?
existing tests
Author: Feng Liu <fe...@databricks.com>
Closes #18400 from liufengdb/synchronize.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9151beb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9151beb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9151beb
Branch: refs/heads/master
Commit: f9151bebca986d44cdab7699959fec2bc050773a
Parents: 18066f2
Author: Feng Liu <fe...@databricks.com>
Authored: Thu Jun 29 16:03:15 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Jun 29 16:03:15 2017 -0700
----------------------------------------------------------------------
.../apache/spark/storage/BlockInfoManager.scala | 24 ++++++++------------
1 file changed, 9 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f9151beb/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 7064872..219a0e7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging {
*
* @return the ids of blocks whose pins were released
*/
- def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
+ def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
- val readLocks = synchronized {
- readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
- }
- val writeLocks = synchronized {
- writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
- }
+ val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
+ val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
for (blockId <- writeLocks) {
infos.get(blockId).foreach { info =>
@@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging {
}
blocksWithReleasedLocks += blockId
}
+
readLocks.entrySet().iterator().asScala.foreach { entry =>
val blockId = entry.getElement
val lockCount = entry.getCount
blocksWithReleasedLocks += blockId
- synchronized {
- get(blockId).foreach { info =>
- info.readerCount -= lockCount
- assert(info.readerCount >= 0)
- }
+ get(blockId).foreach { info =>
+ info.readerCount -= lockCount
+ assert(info.readerCount >= 0)
}
}
- synchronized {
- notifyAll()
- }
+ notifyAll()
+
blocksWithReleasedLocks
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org