You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/06 03:32:18 UTC

git commit: Expose SparkListeners and relevant classes as DeveloperApi

Repository: spark
Updated Branches:
  refs/heads/master 8e724dcba -> ea10b3126


Expose SparkListeners and relevant classes as DeveloperApi

Hopefully this can go into 1.0, as a few people on the user list have asked for this.

Author: Andrew Or <an...@gmail.com>

Closes #648 from andrewor14/expose-listeners and squashes the following commits:

e45e1ef [Andrew Or] Add missing colons (minor)
350d643 [Andrew Or] Expose SparkListeners and relevant classes as DeveloperApi


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea10b312
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea10b312
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea10b312

Branch: refs/heads/master
Commit: ea10b3126167af3f50f7c2a70e1d942e839fcb66
Parents: 8e724dc
Author: Andrew Or <an...@gmail.com>
Authored: Mon May 5 18:32:14 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon May 5 18:32:14 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockId.scala      | 24 ++++++++++++++------
 .../apache/spark/storage/BlockManagerId.scala   | 15 +++++++-----
 .../spark/storage/BlockManagerMasterActor.scala |  4 +++-
 .../org/apache/spark/storage/StorageLevel.scala | 17 ++++++++------
 .../spark/storage/StorageStatusListener.scala   |  7 ++++--
 .../org/apache/spark/storage/StorageUtils.scala |  9 ++++++--
 .../apache/spark/ui/env/EnvironmentTab.scala    |  5 +++-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  5 +++-
 .../apache/spark/ui/jobs/ExecutorSummary.scala  | 10 ++++++--
 .../spark/ui/jobs/JobProgressListener.scala     |  8 +++++--
 .../apache/spark/ui/storage/StorageTab.scala    |  7 ++++--
 11 files changed, 78 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index cffea28..42ec181 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -19,14 +19,18 @@ package org.apache.spark.storage
 
 import java.util.UUID
 
+import org.apache.spark.annotation.DeveloperApi
+
 /**
+ * :: DeveloperApi ::
  * Identifies a particular Block of data, usually associated with a single file.
  * A Block can be uniquely identified by its filename, but each type of Block has a different
  * set of keys which produce its unique name.
  *
  * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
  */
-private[spark] sealed abstract class BlockId {
+@DeveloperApi
+sealed abstract class BlockId {
   /** A globally unique identifier for this Block. Can be used for ser/de. */
   def name: String
 
@@ -44,24 +48,29 @@ private[spark] sealed abstract class BlockId {
   }
 }
 
-private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
+@DeveloperApi
+case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
   def name = "rdd_" + rddId + "_" + splitIndex
 }
 
-private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
+@DeveloperApi
+case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
   extends BlockId {
   def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
 }
 
-private[spark] case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
+@DeveloperApi
+case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
   def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
 }
 
-private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
+@DeveloperApi
+case class TaskResultBlockId(taskId: Long) extends BlockId {
   def name = "taskresult_" + taskId
 }
 
-private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
+@DeveloperApi
+case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
   def name = "input-" + streamId + "-" + uniqueId
 }
 
@@ -75,7 +84,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
   def name = "test_" + id
 }
 
-private[spark] object BlockId {
+@DeveloperApi
+object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
   val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
   val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index be537d7..b1585bd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -20,17 +20,20 @@ package org.apache.spark.storage
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
 
 /**
+ * :: DeveloperApi ::
  * This class represent an unique identifier for a BlockManager.
- * The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the apply method in
- * the companion object. This allows de-duplication of ID objects.
- * Also, constructor parameters are private to ensure that parameters cannot
- * be modified from outside this class.
+ *
+ * The first 2 constructors of this class is made private to ensure that BlockManagerId objects
+ * can be created only using the apply method in the companion object. This allows de-duplication
+ * of ID objects. Also, constructor parameters are private to ensure that parameters cannot be
+ * modified from outside this class.
  */
-private[spark] class BlockManagerId private (
+@DeveloperApi
+class BlockManagerId private (
     private var executorId_ : String,
     private var host_ : String,
     private var port_ : Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 63fa5d3..98fa0df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
 import akka.pattern.ask
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
@@ -411,7 +412,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
   }
 }
 
-private[spark] case class BlockStatus(
+@DeveloperApi
+case class BlockStatus(
     storageLevel: StorageLevel,
     memSize: Long,
     diskSize: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index c9a52e0..363de93 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -22,14 +22,17 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 import org.apache.spark.annotation.DeveloperApi
 
 /**
+ * :: DeveloperApi ::
  * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
  * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
  * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
  * multiple nodes.
+ *
  * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
  * for commonly useful storage levels. To create your own storage level object, use the
  * factory method of the singleton object (`StorageLevel(...)`).
  */
+@DeveloperApi
 class StorageLevel private(
     private var useDisk_ : Boolean,
     private var useMemory_ : Boolean,
@@ -54,9 +57,9 @@ class StorageLevel private(
   assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
 
   if (useOffHeap) {
-    require(useDisk == false, "Off-heap storage level does not support using disk")
-    require(useMemory == false, "Off-heap storage level does not support using heap memory")
-    require(deserialized == false, "Off-heap storage level does not support deserialized storage")
+    require(!useDisk, "Off-heap storage level does not support using disk")
+    require(!useMemory, "Off-heap storage level does not support using heap memory")
+    require(!deserialized, "Off-heap storage level does not support deserialized storage")
     require(replication == 1, "Off-heap storage level does not support multiple replication")
   }
 
@@ -146,7 +149,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Create a new StorageLevel object without setting useOffHeap
+   * Create a new StorageLevel object without setting useOffHeap.
    */
   @DeveloperApi
   def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
@@ -155,7 +158,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Create a new StorageLevel object
+   * Create a new StorageLevel object.
    */
   @DeveloperApi
   def apply(useDisk: Boolean, useMemory: Boolean,
@@ -164,7 +167,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Create a new StorageLevel object from its integer representation
+   * Create a new StorageLevel object from its integer representation.
    */
   @DeveloperApi
   def apply(flags: Int, replication: Int): StorageLevel =
@@ -172,7 +175,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Read StorageLevel object from ObjectInput stream
+   * Read StorageLevel object from ObjectInput stream.
    */
   @DeveloperApi
   def apply(in: ObjectInput): StorageLevel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 7a17495..a6e6627 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,12 +19,15 @@ package org.apache.spark.storage
 
 import scala.collection.mutable
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 
 /**
- * A SparkListener that maintains executor storage status
+ * :: DeveloperApi ::
+ * A SparkListener that maintains executor storage status.
  */
-private[spark] class StorageStatusListener extends SparkListener {
+@DeveloperApi
+class StorageStatusListener extends SparkListener {
   private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
 
   def storageStatusList = executorIdToStorageStatus.values.toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 1eddd1c..6f3252a 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -21,9 +21,14 @@ import scala.collection.Map
 import scala.collection.mutable
 
 import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
 
-/** Storage information for each BlockManager. */
-private[spark] class StorageStatus(
+/**
+ * :: DeveloperApi ::
+ * Storage information for each BlockManager.
+ */
+@DeveloperApi
+class StorageStatus(
     val blockManagerId: BlockManagerId,
     val maxMem: Long,
     val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
index 03b46e1..bbbe55e 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ui.env
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.ui._
 
@@ -30,9 +31,11 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi
 }
 
 /**
+ * :: DeveloperApi ::
  * A SparkListener that prepares information to be displayed on the EnvironmentTab
  */
-private[ui] class EnvironmentListener extends SparkListener {
+@DeveloperApi
+class EnvironmentListener extends SparkListener {
   var jvmInformation = Seq[(String, String)]()
   var sparkProperties = Seq[(String, String)]()
   var systemProperties = Seq[(String, String)]()

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 5678bf3..91d37b8 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui.exec
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.ExceptionFailure
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.{SparkUI, WebUITab}
@@ -34,9 +35,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
 }
 
 /**
+ * :: DeveloperApi ::
  * A SparkListener that prepares information to be displayed on the ExecutorsTab
  */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+@DeveloperApi
+class ExecutorsListener(storageStatusListener: StorageStatusListener)
   extends SparkListener {
 
   val executorToTasksActive = HashMap[String, Int]()

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 1dfe1d4..2aaf632 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -17,8 +17,14 @@
 
 package org.apache.spark.ui.jobs
 
-/** class for reporting aggregated metrics for each executors in stageUI */
-private[ui] class ExecutorSummary {
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Class for reporting aggregated metrics for each executor in stage UI.
+ */
+@DeveloperApi
+class ExecutorSummary {
   var taskTime : Long = 0
   var failedTasks : Int = 0
   var succeededTasks : Int = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0db4afa..396cbcb 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -20,19 +20,22 @@ package org.apache.spark.ui.jobs
 import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 
 /**
+ * :: DeveloperApi ::
  * Tracks task-level information to be displayed in the UI.
  *
  * All access to the data structures in this class must be synchronized on the
  * class, since the UI thread and the EventBus loop may otherwise be reading and
  * updating the internal data structures concurrently.
  */
-private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
+@DeveloperApi
+class JobProgressListener(conf: SparkConf) extends SparkListener {
 
   import JobProgressListener._
 
@@ -246,7 +249,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
 
 }
 
-private[ui] case class TaskUIData(
+@DeveloperApi
+case class TaskUIData(
     taskInfo: TaskInfo,
     taskMetrics: Option[TaskMetrics] = None,
     exception: Option[ExceptionFailure] = None)

http://git-wip-us.apache.org/repos/asf/spark/blob/ea10b312/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 07ec297..c4bb7aa 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.storage
 
 import scala.collection.mutable
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.ui._
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
@@ -35,9 +36,11 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
 }
 
 /**
- * A SparkListener that prepares information to be displayed on the BlockManagerUI
+ * :: DeveloperApi ::
+ * A SparkListener that prepares information to be displayed on the BlockManagerUI.
  */
-private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
+@DeveloperApi
+class StorageListener(storageStatusListener: StorageStatusListener)
   extends SparkListener {
 
   private val _rddInfoMap = mutable.Map[Int, RDDInfo]()