You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/10/08 11:58:42 UTC

spark git commit: [SPARK-22147][CORE] Removed redundant allocations from BlockId

Repository: spark
Updated Branches:
  refs/heads/master 5eacc3bfa -> c998a2ae0


[SPARK-22147][CORE] Removed redundant allocations from BlockId

## What changes were proposed in this pull request?

Prior to this commit BlockId.hashCode and BlockId.equals were defined
in terms of BlockId.name. This allowed the subclasses to be concise and
enforced BlockId.name as a single unique identifier for a block. All
subclasses override BlockId.name with an expression involving an
allocation of StringBuilder and ultimatelly String. This is suboptimal
since it induced unnecessary GC pressure on the dirver, see
BlockManagerMasterEndpoint.

The commit removes the definition of hashCode and equals from the base
class. No other change is necessary since all subclasses are in fact
case classes and therefore have auto-generated hashCode and equals. No
change of behaviour is expected.

Sidenote: you might be wondering, why did the subclasses use the base
implementation and the auto-generated one? Apparently, this behaviour
is documented in the spec. See this SO answer for details
https://stackoverflow.com/a/44990210/262432.

## How was this patch tested?

BlockIdSuite

Author: Sergei Lebedev <s....@criteo.com>

Closes #19369 from superbobry/blockid-equals-hashcode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c998a2ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c998a2ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c998a2ae

Branch: refs/heads/master
Commit: c998a2ae0ea019dfb9b39cef6ddfac07c496e083
Parents: 5eacc3b
Author: Sergei Lebedev <s....@criteo.com>
Authored: Sun Oct 8 12:58:39 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Oct 8 12:58:39 2017 +0100

----------------------------------------------------------------------
 .../netty/NettyBlockTransferService.scala       |  2 +-
 .../org/apache/spark/storage/BlockId.scala      |  5 --
 .../org/apache/spark/storage/DiskStore.scala    |  8 ++--
 .../storage/BlockManagerReplicationSuite.scala  | 49 --------------------
 4 files changed, 5 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c998a2ae/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ac4d850..6a29e18 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -151,7 +151,7 @@ private[spark] class NettyBlockTransferService(
     // Convert or copy nio buffer into array in order to serialize it.
     val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
 
-    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
+    client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,
       new RpcResponseCallback {
         override def onSuccess(response: ByteBuffer): Unit = {
           logTrace(s"Successfully uploaded block $blockId")

http://git-wip-us.apache.org/repos/asf/spark/blob/c998a2ae/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 524f697..a441bae 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -41,11 +41,6 @@ sealed abstract class BlockId {
   def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
 
   override def toString: String = name
-  override def hashCode: Int = name.hashCode
-  override def equals(other: Any): Boolean = other match {
-    case o: BlockId => getClass == o.getClass && name.equals(o.name)
-    case _ => false
-  }
 }
 
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/c998a2ae/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 3579acf..97abd92 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -47,9 +47,9 @@ private[spark] class DiskStore(
   private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
   private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
     Int.MaxValue.toString)
-  private val blockSizes = new ConcurrentHashMap[String, Long]()
+  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
 
-  def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
+  def getSize(blockId: BlockId): Long = blockSizes.get(blockId)
 
   /**
    * Invokes the provided callback function to write the specific block.
@@ -67,7 +67,7 @@ private[spark] class DiskStore(
     var threwException: Boolean = true
     try {
       writeFunc(out)
-      blockSizes.put(blockId.name, out.getCount)
+      blockSizes.put(blockId, out.getCount)
       threwException = false
     } finally {
       try {
@@ -113,7 +113,7 @@ private[spark] class DiskStore(
   }
 
   def remove(blockId: BlockId): Boolean = {
-    blockSizes.remove(blockId.name)
+    blockSizes.remove(blockId)
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {
       val ret = file.delete()

http://git-wip-us.apache.org/repos/asf/spark/blob/c998a2ae/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index dd61dcd..c2101ba 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -198,55 +198,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     }
   }
 
-  test("block replication - deterministic node selection") {
-    val blockSize = 1000
-    val storeSize = 10000
-    val stores = (1 to 5).map {
-      i => makeBlockManager(storeSize, s"store$i")
-    }
-    val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2
-    val storageLevel3x = StorageLevel(true, true, false, true, 3)
-    val storageLevel4x = StorageLevel(true, true, false, true, 4)
-
-    def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
-      stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
-      val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
-      stores.foreach { _.removeBlock(blockId) }
-      master.removeBlock(blockId)
-      locations
-    }
-
-    // Test if two attempts to 2x replication returns same set of locations
-    val a1Locs = putBlockAndGetLocations("a1", storageLevel2x)
-    assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs,
-      "Inserting a 2x replicated block second time gave different locations from the first")
-
-    // Test if two attempts to 3x replication returns same set of locations
-    val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x)
-    assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x,
-      "Inserting a 3x replicated block second time gave different locations from the first")
-
-    // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication
-    val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x)
-    assert(
-      a2Locs2x.subsetOf(a2Locs3x),
-      "Inserting a with 2x replication gave locations that are not a subset of locations" +
-        s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}"
-    )
-
-    // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication
-    val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x)
-    assert(
-      a2Locs3x.subsetOf(a2Locs4x),
-      "Inserting a with 4x replication gave locations that are not a superset of locations " +
-        s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}"
-    )
-
-    // Test if 3x replication of two different blocks gives two different sets of locations
-    val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x)
-    assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication")
-  }
-
   test("block replication - replication failures") {
     /*
       Create a system of three block managers / stores. One of them (say, failableStore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org