You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 23:32:55 UTC

git commit: [SPARK-4155] Consolidate usages of

Repository: spark
Updated Branches:
  refs/heads/master 5231a3f22 -> 9334d6996


[SPARK-4155] Consolidate usages of <driver>

We use "\<driver\>" everywhere. Let's not do that.

Author: Andrew Or <an...@databricks.com>

Closes #3020 from andrewor14/consolidate-driver and squashes the following commits:

c1c2204 [Andrew Or] Just use "<driver>" for local executor ID
3d751e9 [Andrew Or] Consolidate usages of <driver>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9334d699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9334d699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9334d699

Branch: refs/heads/master
Commit: 9334d699671edd8f18370255017ad40c1d0340ee
Parents: 5231a3f
Author: Andrew Or <an...@databricks.com>
Authored: Thu Oct 30 15:32:46 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Oct 30 15:32:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ExecutorAllocationManager.scala   |  2 +-
 .../src/main/scala/org/apache/spark/SparkContext.scala |  2 ++
 core/src/main/scala/org/apache/spark/SparkEnv.scala    |  2 +-
 .../apache/spark/scheduler/local/LocalBackend.scala    |  4 ++--
 .../org/apache/spark/storage/BlockManagerId.scala      |  3 ++-
 .../apache/spark/storage/StorageStatusListener.scala   | 13 ++-----------
 .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala  |  6 ++----
 .../spark/storage/BlockManagerReplicationSuite.scala   |  8 +++++---
 .../org/apache/spark/storage/BlockManagerSuite.scala   | 10 ++++++----
 9 files changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b2cf022..c11f1db 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
 
     override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
       val executorId = blockManagerAdded.blockManagerId.executorId
-      if (executorId != "<driver>") {
+      if (executorId != SparkContext.DRIVER_IDENTIFIER) {
         allocationManager.onExecutorAdded(executorId)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73668e8..6bfcd8c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1333,6 +1333,8 @@ object SparkContext extends Logging {
 
   private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
 
+  private[spark] val DRIVER_IDENTIFIER = "<driver>"
+
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double) = 0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6a6dfda..557d2f5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,7 +156,7 @@ object SparkEnv extends Logging {
     assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
     val hostname = conf.get("spark.driver.host")
     val port = conf.get("spark.driver.port").toInt
-    create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
+    create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 58b78f0..c026483 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import akka.actor.{Actor, ActorRef, Props}
 
-import org.apache.spark.{Logging, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
@@ -47,7 +47,7 @@ private[spark] class LocalActor(
 
   private var freeCores = totalCores
 
-  private val localExecutorId = "localhost"
+  private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
   private val localExecutorHostname = "localhost"
 
   val executor = new Executor(

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 259f423..b177a59 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
 
@@ -59,7 +60,7 @@ class BlockManagerId private (
 
   def port: Int = port_
 
-  def isDriver: Boolean = (executorId == "<driver>")
+  def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER }
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeUTF(executorId_)

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index d9066f7..def49e8 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 
@@ -59,10 +60,9 @@ class StorageStatusListener extends SparkListener {
     val info = taskEnd.taskInfo
     val metrics = taskEnd.taskMetrics
     if (info != null && metrics != null) {
-      val execId = formatExecutorId(info.executorId)
       val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
       if (updatedBlocks.length > 0) {
-        updateStorageStatus(execId, updatedBlocks)
+        updateStorageStatus(info.executorId, updatedBlocks)
       }
     }
   }
@@ -88,13 +88,4 @@ class StorageStatusListener extends SparkListener {
     }
   }
 
-  /**
-   * In the local mode, there is a discrepancy between the executor ID according to the
-   * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
-   * results in duplicate rows for the same executor. Thus, in this mode, we aggregate
-   * these two rows and use the executor ID of "<driver>" to be consistent.
-   */
-  def formatExecutorId(execId: String): String = {
-    if (execId == "localhost") "<driver>" else execId
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 689cf02..9e0e71a 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,14 +48,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
   def storageStatusList = storageStatusListener.storageStatusList
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val eid = formatExecutorId(taskStart.taskInfo.executorId)
+    val eid = taskStart.taskInfo.executorId
     executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
     val info = taskEnd.taskInfo
     if (info != null) {
-      val eid = formatExecutorId(info.executorId)
+      val eid = info.executorId
       executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
       executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
       taskEnd.reason match {
@@ -84,6 +84,4 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
     }
   }
 
-  // This addresses executor ID inconsistencies in the local mode
-  private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1f1d53a..c6d7105 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -27,7 +27,7 @@ import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
@@ -57,7 +57,9 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
 
-  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+  private def makeBlockManager(
+      maxMem: Long,
+      name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
       mapOutputTracker, shuffleManager, transfer)
@@ -108,7 +110,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
       storeIds.filterNot { _ == stores(2).blockManagerId })
 
     // Add driver store and test whether it is filtered out
-    val driverStore = makeBlockManager(1000, "<driver>")
+    val driverStore = makeBlockManager(1000, SparkContext.DRIVER_IDENTIFIER)
     assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
     assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
     assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9d96202..715b740 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
@@ -69,7 +69,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
-  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+  private def makeBlockManager(
+      maxMem: Long,
+      name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NioBlockTransferService(conf, securityMgr)
     new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
       mapOutputTracker, shuffleManager, transfer)
@@ -790,8 +792,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
     val transfer = new NioBlockTransferService(conf, securityMgr)
-    store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf,
-      mapOutputTracker, shuffleManager, transfer)
+    store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
+      new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org