You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/27 00:01:56 UTC

[06/11] git commit: Merge pull request #7 from wannabeast/memorystore-fixes

Merge pull request #7 from wannabeast/memorystore-fixes

some minor fixes to MemoryStore

This is a repeat of #5, moved to its own branch in my repo.

This makes all updates to   on ; it skips on synchronizing the reads where it can get away with it.

(cherry picked from commit 560ee5c9bba3f9fde380c831d0c6701343b2fecf)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8cbc96bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8cbc96bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8cbc96bd

Branch: refs/heads/branch-0.8
Commit: 8cbc96bd49af07953ab921546e56b4e04b8b760e
Parents: 240ca93
Author: Reynold Xin <re...@gmail.com>
Authored: Thu Sep 26 11:27:34 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Sep 26 13:16:05 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/MemoryStore.scala  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8cbc96bd/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 3b3b234..77a39c7 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
 private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   extends BlockStore(blockManager) {
 
-  case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+  case class Entry(value: Any, size: Long, deserialized: Boolean)
 
   private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
-  private var currentMemory = 0L
+  @volatile private var currentMemory = 0L
   // Object used to ensure that only one thread is putting blocks and if necessary, dropping
   // blocks from the memory store.
   private val putLock = new Object()
@@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
   override def remove(blockId: String): Boolean = {
     entries.synchronized {
-      val entry = entries.get(blockId)
+      val entry = entries.remove(blockId)
       if (entry != null) {
-        entries.remove(blockId)
         currentMemory -= entry.size
         logInfo("Block %s of size %d dropped from memory (free %d)".format(
           blockId, entry.size, freeMemory))
@@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   override def clear() {
     entries.synchronized {
       entries.clear()
+      currentMemory = 0
     }
     logInfo("MemoryStore cleared")
   }
@@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     putLock.synchronized {
       if (ensureFreeSpace(blockId, size)) {
         val entry = new Entry(value, size, deserialized)
-        entries.synchronized { entries.put(blockId, entry) }
-        currentMemory += size
+        entries.synchronized {
+          entries.put(blockId, entry)
+          currentMemory += size
+        }
         if (deserialized) {
           logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
             blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))