You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 23:32:55 UTC
git commit: [SPARK-4155] Consolidate usages of
Repository: spark
Updated Branches:
refs/heads/master 5231a3f22 -> 9334d6996
[SPARK-4155] Consolidate usages of <driver>
We use "\<driver\>" everywhere. Let's not do that.
Author: Andrew Or <an...@databricks.com>
Closes #3020 from andrewor14/consolidate-driver and squashes the following commits:
c1c2204 [Andrew Or] Just use "<driver>" for local executor ID
3d751e9 [Andrew Or] Consolidate usages of <driver>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9334d699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9334d699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9334d699
Branch: refs/heads/master
Commit: 9334d699671edd8f18370255017ad40c1d0340ee
Parents: 5231a3f
Author: Andrew Or <an...@databricks.com>
Authored: Thu Oct 30 15:32:46 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Oct 30 15:32:46 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/ExecutorAllocationManager.scala | 2 +-
.../src/main/scala/org/apache/spark/SparkContext.scala | 2 ++
core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +-
.../apache/spark/scheduler/local/LocalBackend.scala | 4 ++--
.../org/apache/spark/storage/BlockManagerId.scala | 3 ++-
.../apache/spark/storage/StorageStatusListener.scala | 13 ++-----------
.../scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 6 ++----
.../spark/storage/BlockManagerReplicationSuite.scala | 8 +++++---
.../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++++----
9 files changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b2cf022..c11f1db 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
- if (executorId != "<driver>") {
+ if (executorId != SparkContext.DRIVER_IDENTIFIER) {
allocationManager.onExecutorAdded(executorId)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73668e8..6bfcd8c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1333,6 +1333,8 @@ object SparkContext extends Logging {
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
+ private[spark] val DRIVER_IDENTIFIER = "<driver>"
+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6a6dfda..557d2f5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,7 +156,7 @@ object SparkEnv extends Logging {
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
- create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
+ create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 58b78f0..c026483 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import akka.actor.{Actor, ActorRef, Props}
-import org.apache.spark.{Logging, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
@@ -47,7 +47,7 @@ private[spark] class LocalActor(
private var freeCores = totalCores
- private val localExecutorId = "localhost"
+ private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
private val localExecutorHostname = "localhost"
val executor = new Executor(
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 259f423..b177a59 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
@@ -59,7 +60,7 @@ class BlockManagerId private (
def port: Int = port_
- def isDriver: Boolean = (executorId == "<driver>")
+ def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER }
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index d9066f7..def49e8 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import scala.collection.mutable
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
@@ -59,10 +60,9 @@ class StorageStatusListener extends SparkListener {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
- val execId = formatExecutorId(info.executorId)
val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
if (updatedBlocks.length > 0) {
- updateStorageStatus(execId, updatedBlocks)
+ updateStorageStatus(info.executorId, updatedBlocks)
}
}
}
@@ -88,13 +88,4 @@ class StorageStatusListener extends SparkListener {
}
}
- /**
- * In the local mode, there is a discrepancy between the executor ID according to the
- * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
- * results in duplicate rows for the same executor. Thus, in this mode, we aggregate
- * these two rows and use the executor ID of "<driver>" to be consistent.
- */
- def formatExecutorId(execId: String): String = {
- if (execId == "localhost") "<driver>" else execId
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 689cf02..9e0e71a 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,14 +48,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
def storageStatusList = storageStatusListener.storageStatusList
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val eid = formatExecutorId(taskStart.taskInfo.executorId)
+ val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
- val eid = formatExecutorId(info.executorId)
+ val eid = info.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
taskEnd.reason match {
@@ -84,6 +84,4 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
}
- // This addresses executor ID inconsistencies in the local mode
- private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1f1d53a..c6d7105 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -27,7 +27,7 @@ import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
@@ -57,7 +57,9 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
- private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ private def makeBlockManager(
+ maxMem: Long,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer)
@@ -108,7 +110,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
storeIds.filterNot { _ == stores(2).blockManagerId })
// Add driver store and test whether it is filtered out
- val driverStore = makeBlockManager(1000, "<driver>")
+ val driverStore = makeBlockManager(1000, SparkContext.DRIVER_IDENTIFIER)
assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9d96202..715b740 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
@@ -69,7 +69,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
- private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ private def makeBlockManager(
+ maxMem: Long,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer)
@@ -790,8 +792,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf,
- mapOutputTracker, shuffleManager, transfer)
+ store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
// The put should fail since a1 is not serializable.
class UnserializableClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org