You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/24 06:09:18 UTC

[1/9] git commit: Refactored streaming scheduler and added listener interface.

Updated Branches:
  refs/heads/master 11107c9de -> 23a9ae6be


Refactored streaming scheduler and added listener interface.

- Refactored Scheduler + JobManager to JobGenerator + JobScheduler and
  added JobSet for cleaner code. Moved scheduler related code to
  streaming.scheduler package.
- Added StreamingListener trait (similar to SparkListener) to enable
  gathering to streaming stats like processing times and delays.
  StreamingContext.addListener() to added listeners.
- Deduped some code in streaming tests by modifying TestSuiteBase, and
  added StreamingListenerSuite.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/097e120c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/097e120c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/097e120c

Branch: refs/heads/master
Commit: 097e120c0c4132f007bfd0b0254b362ee9a02d8f
Parents: 6169fe1
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 12 20:41:51 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 12 20:48:02 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |   2 +-
 .../org/apache/spark/streaming/Checkpoint.scala |   2 +-
 .../org/apache/spark/streaming/DStream.scala    |  11 +-
 .../apache/spark/streaming/DStreamGraph.scala   |   1 +
 .../scala/org/apache/spark/streaming/Job.scala  |  41 -----
 .../org/apache/spark/streaming/JobManager.scala |  88 ----------
 .../spark/streaming/NetworkInputTracker.scala   | 174 ------------------
 .../org/apache/spark/streaming/Scheduler.scala  | 131 --------------
 .../spark/streaming/StreamingContext.scala      |  17 +-
 .../streaming/dstream/ForEachDStream.scala      |   3 +-
 .../streaming/dstream/NetworkInputDStream.scala |   1 +
 .../spark/streaming/scheduler/BatchInfo.scala   |  38 ++++
 .../apache/spark/streaming/scheduler/Job.scala  |  47 +++++
 .../streaming/scheduler/JobGenerator.scala      | 127 ++++++++++++++
 .../streaming/scheduler/JobScheduler.scala      | 104 +++++++++++
 .../spark/streaming/scheduler/JobSet.scala      |  61 +++++++
 .../scheduler/NetworkInputTracker.scala         | 175 +++++++++++++++++++
 .../streaming/scheduler/StreamingListener.scala |  37 ++++
 .../scheduler/StreamingListenerBus.scala        |  81 +++++++++
 .../spark/streaming/BasicOperationsSuite.scala  |  12 --
 .../spark/streaming/CheckpointSuite.scala       |  26 ++-
 .../apache/spark/streaming/FailureSuite.scala   |  13 +-
 .../spark/streaming/InputStreamsSuite.scala     |  12 --
 .../streaming/StreamingListenerSuite.scala      |  71 ++++++++
 .../apache/spark/streaming/TestSuiteBase.scala  |  32 +++-
 .../spark/streaming/WindowOperationsSuite.scala |  14 +-
 26 files changed, 811 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3841b56..2c5d874 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -63,7 +63,7 @@ trait SparkListener {
    * Called when a task begins remotely fetching its result (will not be called for tasks that do
    * not need to fetch the result remotely).
    */
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
 
   /**
    * Called when a task ends

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914..7b343d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -40,7 +40,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
-  val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+  val pendingTimes = ssc.scheduler.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds
 
   def validate() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 9ceff75..8001c49 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -17,23 +17,18 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream._
 import StreamingContext._
-import org.apache.spark.util.MetadataCleaner
-
-//import Time._
-
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.conf.Configuration
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b9a58fd..daed7ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,6 +21,7 @@ import dstream.InputDStream
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 import collection.mutable.ArrayBuffer
 import org.apache.spark.Logging
+import org.apache.spark.streaming.scheduler.Job
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
   initLogging()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
deleted file mode 100644
index 2128b7c..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.util.concurrent.atomic.AtomicLong
-
-private[streaming]
-class Job(val time: Time, func: () => _) {
-  val id = Job.getNewId()
-  def run(): Long = {
-    val startTime = System.currentTimeMillis 
-    func() 
-    val stopTime = System.currentTimeMillis
-    (stopTime - startTime)
-  }
-
-  override def toString = "streaming job " + id + " @ " + time 
-}
-
-private[streaming]
-object Job {
-  val id = new AtomicLong(0)
-
-  def getNewId() = id.getAndIncrement()
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
deleted file mode 100644
index 5233129..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
-import java.util.concurrent.Executors
-import collection.mutable.HashMap
-import collection.mutable.ArrayBuffer
-
-
-private[streaming]
-class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
-  
-  class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
-    def run() {
-      SparkEnv.set(ssc.env)
-      try {
-        val timeTaken = job.run()
-        logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
-          (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
-      } catch {
-        case e: Exception =>
-          logError("Running " + job + " failed", e)
-      }
-      clearJob(job)
-    }
-  }
-
-  initLogging()
-
-  val jobExecutor = Executors.newFixedThreadPool(numThreads) 
-  val jobs = new HashMap[Time, ArrayBuffer[Job]]
-
-  def runJob(job: Job) {
-    jobs.synchronized {
-      jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
-    }
-    jobExecutor.execute(new JobHandler(ssc, job))
-    logInfo("Added " + job + " to queue")
-  }
-
-  def stop() {
-    jobExecutor.shutdown()
-  }
-
-  private def clearJob(job: Job) {
-    var timeCleared = false
-    val time = job.time
-    jobs.synchronized {
-      val jobsOfTime = jobs.get(time)
-      if (jobsOfTime.isDefined) {
-        jobsOfTime.get -= job
-        if (jobsOfTime.get.isEmpty) {
-          jobs -= time
-          timeCleared = true
-        }
-      } else {
-        throw new Exception("Job finished for time " + job.time +
-          " but time does not exist in jobs")
-      }
-    }
-    if (timeCleared) {
-      ssc.scheduler.clearOldMetadata(time)
-    }
-  }
-
-  def getPendingTimes(): Array[Time] = {
-    jobs.synchronized {
-      jobs.keySet.toArray
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
deleted file mode 100644
index b97fb7e..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
-import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
-import org.apache.spark.SparkContext._
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-
-import akka.actor._
-import akka.pattern.ask
-import akka.util.duration._
-import akka.dispatch._
-import org.apache.spark.storage.BlockId
-
-private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
-
-/**
- * This class manages the execution of the receivers of NetworkInputDStreams.
- */
-private[streaming]
-class NetworkInputTracker(
-    @transient ssc: StreamingContext,
-    @transient networkInputStreams: Array[NetworkInputDStream[_]])
-  extends Logging {
-
-  val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
-  val receiverExecutor = new ReceiverExecutor()
-  val receiverInfo = new HashMap[Int, ActorRef]
-  val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
-  val timeout = 5000.milliseconds
-
-  var currentTime: Time = null
-
-  /** Start the actor and receiver execution thread. */
-  def start() {
-    ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
-    receiverExecutor.start()
-  }
-
-  /** Stop the receiver execution thread. */
-  def stop() {
-    // TODO: stop the actor as well
-    receiverExecutor.interrupt()
-    receiverExecutor.stopReceivers()
-  }
-
-  /** Return all the blocks received from a receiver. */
-  def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
-    val queue =  receivedBlockIds.synchronized {
-      receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
-    }
-    val result = queue.synchronized {
-      queue.dequeueAll(x => true)
-    }
-    logInfo("Stream " + receiverId + " received " + result.size + " blocks")
-    result.toArray
-  }
-
-  /** Actor to receive messages from the receivers. */
-  private class NetworkInputTrackerActor extends Actor {
-    def receive = {
-      case RegisterReceiver(streamId, receiverActor) => {
-        if (!networkInputStreamMap.contains(streamId)) {
-          throw new Exception("Register received for unexpected id " + streamId)
-        }
-        receiverInfo += ((streamId, receiverActor))
-        logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
-        sender ! true
-      }
-      case AddBlocks(streamId, blockIds, metadata) => {
-        val tmp = receivedBlockIds.synchronized {
-          if (!receivedBlockIds.contains(streamId)) {
-            receivedBlockIds += ((streamId, new Queue[BlockId]))
-          }
-          receivedBlockIds(streamId)
-        }
-        tmp.synchronized {
-          tmp ++= blockIds
-        }
-        networkInputStreamMap(streamId).addMetadata(metadata)
-      }
-      case DeregisterReceiver(streamId, msg) => {
-        receiverInfo -= streamId
-        logError("De-registered receiver for network stream " + streamId
-          + " with message " + msg)
-        //TODO: Do something about the corresponding NetworkInputDStream
-      }
-    }
-  }
-
-  /** This thread class runs all the receivers on the cluster.  */
-  class ReceiverExecutor extends Thread {
-    val env = ssc.env
-
-    override def run() {
-      try {
-        SparkEnv.set(env)
-        startReceivers()
-      } catch {
-        case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
-      } finally {
-        stopReceivers()
-      }
-    }
-
-    /**
-     * Get the receivers from the NetworkInputDStreams, distributes them to the
-     * worker nodes as a parallel collection, and runs them.
-     */
-    def startReceivers() {
-      val receivers = networkInputStreams.map(nis => {
-        val rcvr = nis.getReceiver()
-        rcvr.setStreamId(nis.id)
-        rcvr
-      })
-
-      // Right now, we only honor preferences if all receivers have them
-      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
-
-      // Create the parallel collection of receivers to distributed them on the worker nodes
-      val tempRDD =
-        if (hasLocationPreferences) {
-          val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
-          ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
-        }
-        else {
-          ssc.sc.makeRDD(receivers, receivers.size)
-        }
-
-      // Function to start the receiver on the worker node
-      val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
-        if (!iterator.hasNext) {
-          throw new Exception("Could not start receiver as details not found.")
-        }
-        iterator.next().start()
-      }
-      // Run the dummy Spark job to ensure that all slaves have registered.
-      // This avoids all the receivers to be scheduled on the same node.
-      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
-
-      // Distribute the receivers and start them
-      ssc.sparkContext.runJob(tempRDD, startReceiver)
-    }
-
-    /** Stops the receivers. */
-    def stopReceivers() {
-      // Signal the receivers to stop
-      receiverInfo.values.foreach(_ ! StopReceiver)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
deleted file mode 100644
index ed892e3..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import util.{ManualClock, RecurringTimer, Clock}
-import org.apache.spark.SparkEnv
-import org.apache.spark.Logging
-
-private[streaming]
-class Scheduler(ssc: StreamingContext) extends Logging {
-
-  initLogging()
-
-  val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
-  val jobManager = new JobManager(ssc, concurrentJobs)
-  val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
-    new CheckpointWriter(ssc.checkpointDir)
-  } else {
-    null
-  }
-
-  val clockClass = System.getProperty(
-    "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-  val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
-  val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
-    longTime => generateJobs(new Time(longTime)))
-  val graph = ssc.graph
-  var latestTime: Time = null
-
-  def start() = synchronized {
-    if (ssc.isCheckpointPresent) {
-      restart()
-    } else {
-      startFirstTime()
-    }
-    logInfo("Scheduler started")
-  }
-  
-  def stop() = synchronized {
-    timer.stop()
-    jobManager.stop()
-    if (checkpointWriter != null) checkpointWriter.stop()
-    ssc.graph.stop()
-    logInfo("Scheduler stopped")    
-  }
-
-  private def startFirstTime() {
-    val startTime = new Time(timer.getStartTime())
-    graph.start(startTime - graph.batchDuration)
-    timer.start(startTime.milliseconds)
-    logInfo("Scheduler's timer started at " + startTime)
-  }
-
-  private def restart() {
-
-    // If manual clock is being used for testing, then
-    // either set the manual clock to the last checkpointed time,
-    // or if the property is defined set it to that time
-    if (clock.isInstanceOf[ManualClock]) {
-      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
-      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
-    }
-
-    val batchDuration = ssc.graph.batchDuration
-
-    // Batches when the master was down, that is,
-    // between the checkpoint and current restart time
-    val checkpointTime = ssc.initialCheckpoint.checkpointTime
-    val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
-    val downTimes = checkpointTime.until(restartTime, batchDuration)
-    logInfo("Batches during down time: " + downTimes.mkString(", "))
-
-    // Batches that were unprocessed before failure
-    val pendingTimes = ssc.initialCheckpoint.pendingTimes
-    logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
-    // Reschedule jobs for these times
-    val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
-    logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
-    timesToReschedule.foreach(time =>
-      graph.generateJobs(time).foreach(jobManager.runJob)
-    )
-
-    // Restart the timer
-    timer.start(restartTime.milliseconds)
-    logInfo("Scheduler's timer restarted at " + restartTime)
-  }
-
-  /** Generate jobs and perform checkpoint for the given `time`.  */
-  def generateJobs(time: Time) {
-    SparkEnv.set(ssc.env)
-    logInfo("\n-----------------------------------------------------\n")
-    graph.generateJobs(time).foreach(jobManager.runJob)
-    latestTime = time
-    doCheckpoint(time)
-  }
-
-  /**
-   * Clear old metadata assuming jobs of `time` have finished processing.
-   * And also perform checkpoint.
-   */
-  def clearOldMetadata(time: Time) {
-    ssc.graph.clearOldMetadata(time)
-    doCheckpoint(time)
-  }
-
-  /** Perform checkpoint for the give `time`. */
-  def doCheckpoint(time: Time) = synchronized {
-    if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
-      logInfo("Checkpointing graph for time " + time)
-      ssc.graph.updateCheckpointData(time)
-      checkpointWriter.write(new Checkpoint(ssc, time))
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 70bf902..83f1cad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
 import twitter4j.Status
 import twitter4j.auth.Authorization
+import org.apache.spark.streaming.scheduler._
 
 
 /**
@@ -146,9 +147,10 @@ class StreamingContext private (
     }
   }
 
-  protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
-  protected[streaming] var receiverJobThread: Thread = null
-  protected[streaming] var scheduler: Scheduler = null
+  protected[streaming] val checkpointDuration: Duration = {
+    if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+  }
+  protected[streaming] val scheduler = new JobScheduler(this)
 
   /**
    * Return the associated Spark context
@@ -510,6 +512,10 @@ class StreamingContext private (
     graph.addOutputStream(outputStream)
   }
 
+  def addListener(streamingListener: StreamingListener) {
+    scheduler.listenerBus.addListener(streamingListener)
+  }
+
   protected def validate() {
     assert(graph != null, "Graph is null")
     graph.validate()
@@ -525,9 +531,6 @@ class StreamingContext private (
    * Start the execution of the streams.
    */
   def start() {
-    if (checkpointDir != null && checkpointDuration == null && graph != null) {
-      checkpointDuration = graph.batchDuration
-    }
 
     validate()
 
@@ -545,7 +548,6 @@ class StreamingContext private (
     Thread.sleep(1000)
 
     // Start the scheduler
-    scheduler = new Scheduler(this)
     scheduler.start()
   }
 
@@ -556,7 +558,6 @@ class StreamingContext private (
     try {
       if (scheduler != null) scheduler.stop()
       if (networkInputTracker != null) networkInputTracker.stop()
-      if (receiverJobThread != null) receiverJobThread.interrupt()
       sc.stop()
       logInfo("StreamingContext stopped successfully")
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index e21bac4..0072248 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.scheduler.Job
 
 private[streaming]
 class ForEachDStream[T: ClassManifest] (

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a82862c..1df7f54 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -32,6 +32,7 @@ import org.apache.spark.streaming._
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.rdd.{RDD, BlockRDD}
 import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
 
 /**
  * Abstract class for defining any InputDStream that has to start a receiver on worker

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
new file mode 100644
index 0000000..798598a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.Time
+
+case class BatchInfo(
+    batchTime: Time,
+    submissionTime: Long,
+    processingStartTime: Option[Long],
+    processingEndTime: Option[Long]
+  ) {
+
+  def schedulingDelay = processingStartTime.map(_ - submissionTime)
+
+  def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+
+  def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
new file mode 100644
index 0000000..bca5e1f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.streaming.Time
+
+private[streaming]
+class Job(val time: Time, func: () => _) {
+  var id: String = _
+
+  def run(): Long = {
+    val startTime = System.currentTimeMillis 
+    func() 
+    val stopTime = System.currentTimeMillis
+    (stopTime - startTime)
+  }
+
+  def setId(number: Int) {
+    id = "streaming job " + time + "." + number
+  }
+
+  override def toString = id
+}
+/*
+private[streaming]
+object Job {
+  val id = new AtomicLong(0)
+
+  def getNewId() = id.getAndIncrement()
+}
+*/

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
new file mode 100644
index 0000000..5d3ce9c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.Logging
+import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
+import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+
+private[streaming]
+class JobGenerator(jobScheduler: JobScheduler) extends Logging {
+
+  initLogging()
+  val ssc = jobScheduler.ssc
+  val clockClass = System.getProperty(
+    "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+  val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+  val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+    longTime => generateJobs(new Time(longTime)))
+  val graph = ssc.graph
+  lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+    new CheckpointWriter(ssc.checkpointDir)
+  } else {
+    null
+  }
+
+  var latestTime: Time = null
+
+  def start() = synchronized {
+    if (ssc.isCheckpointPresent) {
+      restart()
+    } else {
+      startFirstTime()
+    }
+    logInfo("JobGenerator started")
+  }
+  
+  def stop() = synchronized {
+    timer.stop()
+    if (checkpointWriter != null) checkpointWriter.stop()
+    ssc.graph.stop()
+    logInfo("JobGenerator stopped")
+  }
+
+  private def startFirstTime() {
+    val startTime = new Time(timer.getStartTime())
+    graph.start(startTime - graph.batchDuration)
+    timer.start(startTime.milliseconds)
+    logInfo("JobGenerator's timer started at " + startTime)
+  }
+
+  private def restart() {
+    // If manual clock is being used for testing, then
+    // either set the manual clock to the last checkpointed time,
+    // or if the property is defined set it to that time
+    if (clock.isInstanceOf[ManualClock]) {
+      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+      val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+    }
+
+    val batchDuration = ssc.graph.batchDuration
+
+    // Batches when the master was down, that is,
+    // between the checkpoint and current restart time
+    val checkpointTime = ssc.initialCheckpoint.checkpointTime
+    val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+    val downTimes = checkpointTime.until(restartTime, batchDuration)
+    logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+    // Batches that were unprocessed before failure
+    val pendingTimes = ssc.initialCheckpoint.pendingTimes
+    logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+    // Reschedule jobs for these times
+    val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+    logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+    timesToReschedule.foreach(time =>
+      jobScheduler.runJobs(time, graph.generateJobs(time))
+    )
+
+    // Restart the timer
+    timer.start(restartTime.milliseconds)
+    logInfo("JobGenerator's timer restarted at " + restartTime)
+  }
+
+  /** Generate jobs and perform checkpoint for the given `time`.  */
+  private def generateJobs(time: Time) {
+    SparkEnv.set(ssc.env)
+    logInfo("\n-----------------------------------------------------\n")
+    jobScheduler.runJobs(time, graph.generateJobs(time))
+    latestTime = time
+    doCheckpoint(time)
+  }
+
+  /**
+   * On batch completion, clear old metadata and checkpoint computation.
+   */
+  private[streaming] def onBatchCompletion(time: Time) {
+    ssc.graph.clearOldMetadata(time)
+    doCheckpoint(time)
+  }
+
+  /** Perform checkpoint for the give `time`. */
+  private def doCheckpoint(time: Time) = synchronized {
+    if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+      logInfo("Checkpointing graph for time " + time)
+      ssc.graph.updateCheckpointData(time)
+      checkpointWriter.write(new Checkpoint(ssc, time))
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
new file mode 100644
index 0000000..14906fd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming._
+
+
+private[streaming]
+class JobScheduler(val ssc: StreamingContext) extends Logging {
+
+  initLogging()
+
+  val jobSets = new ConcurrentHashMap[Time, JobSet]
+  val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+  val executor = Executors.newFixedThreadPool(numConcurrentJobs)
+  val generator = new JobGenerator(this)
+  val listenerBus = new StreamingListenerBus()
+
+  def clock = generator.clock
+
+  def start() {
+    generator.start()
+  }
+
+  def stop() {
+    generator.stop()
+    executor.shutdown()
+    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+      executor.shutdownNow()
+    }
+  }
+
+  def runJobs(time: Time, jobs: Seq[Job]) {
+    if (jobs.isEmpty) {
+      logInfo("No jobs added for time " + time)
+    } else {
+      val jobSet = new JobSet(time, jobs)
+      jobSets.put(time, jobSet)
+      jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
+      logInfo("Added jobs for time " + time)
+    }
+  }
+
+  def getPendingTimes(): Array[Time] = {
+    jobSets.keySet.toArray(new Array[Time](0))
+  }
+
+  private def beforeJobStart(job: Job) {
+    val jobSet = jobSets.get(job.time)
+    if (!jobSet.hasStarted) {
+      listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo()))
+    }
+    jobSet.beforeJobStart(job)
+    logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
+    SparkEnv.set(generator.ssc.env)
+  }
+
+  private def afterJobEnd(job: Job) {
+    val jobSet = jobSets.get(job.time)
+    jobSet.afterJobStop(job)
+    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+    if (jobSet.hasCompleted) {
+      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
+      jobSets.remove(jobSet.time)
+      generator.onBatchCompletion(jobSet.time)
+      logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+        jobSet.totalDelay / 1000.0, jobSet.time.toString,
+        jobSet.processingDelay / 1000.0
+      ))
+    }
+  }
+
+  class JobHandler(job: Job) extends Runnable {
+    def run() {
+      beforeJobStart(job)
+      try {
+        job.run()
+      } catch {
+        case e: Exception =>
+          logError("Running " + job + " failed", e)
+      }
+      afterJobEnd(job)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
new file mode 100644
index 0000000..05233d0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming.Time
+
+private[streaming]
+case class JobSet(time: Time, jobs: Seq[Job]) {
+
+  private val incompleteJobs = new HashSet[Job]()
+  var submissionTime = System.currentTimeMillis()
+  var processingStartTime = -1L
+  var processingEndTime = -1L
+
+  jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+  incompleteJobs ++= jobs
+
+  def beforeJobStart(job: Job) {
+    if (processingStartTime < 0) processingStartTime = System.currentTimeMillis()
+  }
+
+  def afterJobStop(job: Job) {
+    incompleteJobs -= job
+    if (hasCompleted) processingEndTime = System.currentTimeMillis()
+  }
+
+  def hasStarted() = (processingStartTime > 0)
+
+  def hasCompleted() = incompleteJobs.isEmpty
+
+  def processingDelay = processingEndTime - processingStartTime
+
+  def totalDelay = {
+    processingEndTime - time.milliseconds
+  }
+
+  def toBatchInfo(): BatchInfo = {
+    new BatchInfo(
+      time,
+      submissionTime,
+      if (processingStartTime >= 0 ) Some(processingStartTime) else None,
+      if (processingEndTime >= 0 ) Some(processingEndTime) else None
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
new file mode 100644
index 0000000..c759302
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import org.apache.spark.SparkContext._
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+
+import akka.actor._
+import akka.pattern.ask
+import akka.util.duration._
+import akka.dispatch._
+import org.apache.spark.storage.BlockId
+import org.apache.spark.streaming.{Time, StreamingContext}
+
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+
+/**
+ * This class manages the execution of the receivers of NetworkInputDStreams.
+ */
+private[streaming]
+class NetworkInputTracker(
+    @transient ssc: StreamingContext,
+    @transient networkInputStreams: Array[NetworkInputDStream[_]])
+  extends Logging {
+
+  val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
+  val receiverExecutor = new ReceiverExecutor()
+  val receiverInfo = new HashMap[Int, ActorRef]
+  val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
+  val timeout = 5000.milliseconds
+
+  var currentTime: Time = null
+
+  /** Start the actor and receiver execution thread. */
+  def start() {
+    ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+    receiverExecutor.start()
+  }
+
+  /** Stop the receiver execution thread. */
+  def stop() {
+    // TODO: stop the actor as well
+    receiverExecutor.interrupt()
+    receiverExecutor.stopReceivers()
+  }
+
+  /** Return all the blocks received from a receiver. */
+  def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
+    val queue =  receivedBlockIds.synchronized {
+      receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
+    }
+    val result = queue.synchronized {
+      queue.dequeueAll(x => true)
+    }
+    logInfo("Stream " + receiverId + " received " + result.size + " blocks")
+    result.toArray
+  }
+
+  /** Actor to receive messages from the receivers. */
+  private class NetworkInputTrackerActor extends Actor {
+    def receive = {
+      case RegisterReceiver(streamId, receiverActor) => {
+        if (!networkInputStreamMap.contains(streamId)) {
+          throw new Exception("Register received for unexpected id " + streamId)
+        }
+        receiverInfo += ((streamId, receiverActor))
+        logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+        sender ! true
+      }
+      case AddBlocks(streamId, blockIds, metadata) => {
+        val tmp = receivedBlockIds.synchronized {
+          if (!receivedBlockIds.contains(streamId)) {
+            receivedBlockIds += ((streamId, new Queue[BlockId]))
+          }
+          receivedBlockIds(streamId)
+        }
+        tmp.synchronized {
+          tmp ++= blockIds
+        }
+        networkInputStreamMap(streamId).addMetadata(metadata)
+      }
+      case DeregisterReceiver(streamId, msg) => {
+        receiverInfo -= streamId
+        logError("De-registered receiver for network stream " + streamId
+          + " with message " + msg)
+        //TODO: Do something about the corresponding NetworkInputDStream
+      }
+    }
+  }
+
+  /** This thread class runs all the receivers on the cluster.  */
+  class ReceiverExecutor extends Thread {
+    val env = ssc.env
+
+    override def run() {
+      try {
+        SparkEnv.set(env)
+        startReceivers()
+      } catch {
+        case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
+      } finally {
+        stopReceivers()
+      }
+    }
+
+    /**
+     * Get the receivers from the NetworkInputDStreams, distributes them to the
+     * worker nodes as a parallel collection, and runs them.
+     */
+    def startReceivers() {
+      val receivers = networkInputStreams.map(nis => {
+        val rcvr = nis.getReceiver()
+        rcvr.setStreamId(nis.id)
+        rcvr
+      })
+
+      // Right now, we only honor preferences if all receivers have them
+      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+
+      // Create the parallel collection of receivers to distributed them on the worker nodes
+      val tempRDD =
+        if (hasLocationPreferences) {
+          val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+          ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
+        }
+        else {
+          ssc.sc.makeRDD(receivers, receivers.size)
+        }
+
+      // Function to start the receiver on the worker node
+      val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
+        if (!iterator.hasNext) {
+          throw new Exception("Could not start receiver as details not found.")
+        }
+        iterator.next().start()
+      }
+      // Run the dummy Spark job to ensure that all slaves have registered.
+      // This avoids all the receivers to be scheduled on the same node.
+      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+
+      // Distribute the receivers and start them
+      ssc.sparkContext.runJob(tempRDD, startReceiver)
+    }
+
+    /** Stops the receivers. */
+    def stopReceivers() {
+      // Signal the receivers to stop
+      receiverInfo.values.foreach(_ ! StopReceiver)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
new file mode 100644
index 0000000..49fd0d2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+sealed trait StreamingListenerEvent
+
+case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+trait StreamingListener {
+
+  /**
+   * Called when processing of a batch has completed
+   */
+  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
+
+  /**
+   * Called when processing of a batch has started
+   */
+  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
new file mode 100644
index 0000000..324e491
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.concurrent.LinkedBlockingQueue
+
+/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
+private[spark] class StreamingListenerBus() extends Logging {
+  private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+
+  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
+  private var queueFullErrorMessageLogged = false
+
+  new Thread("StreamingListenerBus") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        event match {
+          case batchStarted: StreamingListenerBatchStarted =>
+            listeners.foreach(_.onBatchStarted(batchStarted))
+          case batchCompleted: StreamingListenerBatchCompleted =>
+            listeners.foreach(_.onBatchCompleted(batchCompleted))
+          case _ =>
+        }
+      }
+    }
+  }.start()
+
+  def addListener(listener: StreamingListener) {
+    listeners += listener
+  }
+
+  def post(event: StreamingListenerEvent) {
+    val eventAdded = eventQueue.offer(event)
+    if (!eventAdded && !queueFullErrorMessageLogged) {
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+        "rate at which tasks are being started by the scheduler.")
+      queueFullErrorMessageLogged = true
+    }
+  }
+
+  /**
+   * Waits until there are no more events in the queue, or until the specified time has elapsed.
+   * Used for testing only. Returns true if the queue has emptied and false is the specified time
+   * elapsed before the queue emptied.
+   */
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!eventQueue.isEmpty()) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+       * add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    return true
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 259ef16..b35ca00 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -26,18 +26,6 @@ import util.ManualClock
 
 class BasicOperationsSuite extends TestSuiteBase {
 
-  override def framework() = "BasicOperationsSuite"
-
-  before {
-    System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-  }
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
-
   test("map") {
     val input = Seq(1 to 4, 5 to 8, 9 to 12)
     testOperation(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index beb2083..c93075e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -34,31 +34,25 @@ import com.google.common.io.Files
  * the checkpointing of a DStream's RDDs as well as the checkpointing of
  * the whole DStream graph.
  */
-class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+class CheckpointSuite extends TestSuiteBase {
 
-  System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  var ssc: StreamingContext = null
+
+  override def batchDuration = Milliseconds(500)
+
+  override def actuallyWait = true // to allow checkpoints to be written
 
-  before {
+  override def beforeFunction() {
+    super.beforeFunction()
     FileUtils.deleteDirectory(new File(checkpointDir))
   }
 
-  after {
+  override def afterFunction() {
+    super.afterFunction()
     if (ssc != null) ssc.stop()
     FileUtils.deleteDirectory(new File(checkpointDir))
-
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
-  var ssc: StreamingContext = null
-
-  override def framework = "CheckpointSuite"
-
-  override def batchDuration = Milliseconds(500)
-
-  override def actuallyWait = true
-
   test("basic rdd checkpoints + dstream graph checkpoint recovery") {
 
     assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6337c53..da9b04d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -32,17 +32,22 @@ import collection.mutable.ArrayBuffer
  * This testsuite tests master failures at random times while the stream is running using
  * the real clock.
  */
-class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+class FailureSuite extends TestSuiteBase with Logging {
 
   var directory = "FailureSuite"
   val numBatches = 30
-  val batchDuration = Milliseconds(1000)
 
-  before {
+  override def batchDuration = Milliseconds(1000)
+
+  override def useManualClock = false
+
+  override def beforeFunction() {
+    super.beforeFunction()
     FileUtils.deleteDirectory(new File(directory))
   }
 
-  after {
+  override def afterFunction() {
+    super.afterFunction()
     FileUtils.deleteDirectory(new File(directory))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82de..62a9f12 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -50,18 +50,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
   val testPort = 9999
 
-  override def checkpointDir = "checkpoint"
-
-  before {
-    System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-  }
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
-
   test("socket input stream") {
     // Start the server
     val testServer = new TestServer()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
new file mode 100644
index 0000000..826c839
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.matchers.ShouldMatchers
+
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
+
+  val input = (1 to 4).map(Seq(_)).toSeq
+  val operation = (d: DStream[Int]) => d.map(x => x)
+
+  // To make sure that the processing start and end times in collected
+  // information are different for successive batches
+  override def batchDuration = Milliseconds(100)
+  override def actuallyWait = true
+
+  test("basic BatchInfo generation") {
+    val ssc = setupStreams(input, operation)
+    val collector = new BatchInfoCollector
+    ssc.addListener(collector)
+    runStreams(ssc, input.size, input.size)
+    val batchInfos = collector.batchInfos
+    batchInfos should have size 4
+
+    batchInfos.foreach(info => {
+      info.schedulingDelay should not be None
+      info.processingDelay should not be None
+      info.totalDelay should not be None
+      info.schedulingDelay.get should be >= 0L
+      info.processingDelay.get should be >= 0L
+      info.totalDelay.get should be >= 0L
+    })
+
+    isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+  }
+
+  /** Check if a sequence of numbers is in increasing order */
+  def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
+    for(i <- 1 until seq.size) {
+      if (seq(i - 1) > seq(i)) return false
+    }
+    true
+  }
+
+  /** Listener that collects information on processed batches */
+  class BatchInfoCollector extends StreamingListener {
+    val batchInfos = new ArrayBuffer[BatchInfo]
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+      batchInfos += batchCompleted.batchInfo
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 8c8c359..fbbeb8f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -109,7 +109,7 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
 trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
   // Name of the framework for Spark context
-  def framework = "TestSuiteBase"
+  def framework = this.getClass.getSimpleName
 
   // Master for Spark context
   def master = "local[2]"
@@ -126,9 +126,39 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
   // Maximum time to wait before the test times out
   def maxWaitTimeMillis = 10000
 
+  // Whether to use manual clock or not
+  def useManualClock = true
+
   // Whether to actually wait in real time before changing manual clock
   def actuallyWait = false
 
+  // Default before function for any streaming test suite. Override this
+  // if you want to add your stuff to "before" (i.e., don't call before { } )
+  def beforeFunction() {
+    if (useManualClock) {
+      System.setProperty(
+        "spark.streaming.clock",
+        "org.apache.spark.streaming.util.ManualClock"
+      )
+    } else {
+      System.clearProperty("spark.streaming.clock")
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+  }
+
+  // Default after function for any streaming test suite. Override this
+  // if you want to add your stuff to "after" (i.e., don't call after { } )
+  def afterFunction() {
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+  }
+
+  before(beforeFunction)
+  after(afterFunction)
+
   /**
    * Set up required DStreams to test the DStream operation using the two sequences
    * of input collections.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/097e120c/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index f50e05c..6b4aaef 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -22,19 +22,9 @@ import collection.mutable.ArrayBuffer
 
 class WindowOperationsSuite extends TestSuiteBase {
 
-  System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  override def maxWaitTimeMillis = 20000  // large window tests can sometimes take longer
 
-  override def framework = "WindowOperationsSuite"
-
-  override def maxWaitTimeMillis = 20000
-
-  override def batchDuration = Seconds(1)
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
+  override def batchDuration = Seconds(1)  // making sure its visible in this class
 
   val largerSlideInput = Seq(
     Seq(("a", 1)),


[2/9] git commit: Added StatsReportListener to generate processing time statistics across multiple batches.

Posted by ma...@apache.org.
Added StatsReportListener to generate processing time statistics across multiple batches.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b80ec056
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b80ec056
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b80ec056

Branch: refs/heads/master
Commit: b80ec05635132f96772545803a10a1bbfa1250e7
Parents: 097e120
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 18 15:35:24 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 18 15:35:24 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |  5 +--
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 .../streaming/scheduler/StreamingListener.scala | 45 +++++++++++++++++++-
 3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 2c5d874..ee63b3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
 
   def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
     val stats = d.statCounter
-    logInfo(heading + stats)
     val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+    logInfo(heading + stats)
     logInfo(percentilesHeader)
     logInfo("\t" + quantiles.mkString("\t"))
   }
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
-
-
   val seconds = 1000L
   val minutes = seconds * 60
   val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
 }
 
 
-
 case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
 object RuntimePercentage {
   def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 14906fd..69930f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     jobSet.afterJobStop(job)
     logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
     if (jobSet.hasCompleted) {
-      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
       jobSets.remove(jobSet.time)
       generator.onBatchCompletion(jobSet.time)
       logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
         jobSet.totalDelay / 1000.0, jobSet.time.toString,
         jobSet.processingDelay / 1000.0
       ))
+      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 49fd0d2..5647ffa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -17,14 +17,22 @@
 
 package org.apache.spark.streaming.scheduler
 
+import scala.collection.mutable.Queue
+import org.apache.spark.util.Distribution
+
+/** Base trait for events related to StreamingListener */
 sealed trait StreamingListenerEvent
 
 case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
-trait StreamingListener {
 
+/**
+ * A listener interface for receiving information about an ongoing streaming
+ * computation.
+ */
+trait StreamingListener {
   /**
    * Called when processing of a batch has completed
    */
@@ -34,4 +42,39 @@ trait StreamingListener {
    * Called when processing of a batch has started
    */
   def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
+
+
+/**
+ * A simple StreamingListener that logs summary statistics across Spark Streaming batches
+ * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
+ */
+class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
+
+  import org.apache.spark
+
+  val batchInfos = new Queue[BatchInfo]()
+
+  override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
+    addToQueue(batchStarted.batchInfo)
+    printStats()
+  }
+
+  def addToQueue(newPoint: BatchInfo) {
+    batchInfos.enqueue(newPoint)
+    if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+  }
+
+  def printStats() {
+    showMillisDistribution("Total delay: ", _.totalDelay)
+    showMillisDistribution("Processing time: ", _.processingDelay)
+  }
+
+  def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
+    spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
+  }
+
+  def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+    Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
+  }
 }
\ No newline at end of file


[3/9] git commit: Merge branch 'apache-master' into scheduler-update

Posted by ma...@apache.org.
Merge branch 'apache-master' into scheduler-update

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
	streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e93b391d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e93b391d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e93b391d

Branch: refs/heads/master
Commit: e93b391d75a1c2e17ad93caff39e8fc34d640935
Parents: b80ec05 5ea1872
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 18 17:51:14 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 18 17:51:14 2013 -0800

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 README.md                                       |   14 +-
 assembly/pom.xml                                |   16 +-
 bagel/pom.xml                                   |   12 +-
 bin/compute-classpath.cmd                       |    2 +-
 bin/compute-classpath.sh                        |    2 +-
 core/pom.xml                                    |   35 +-
 .../apache/spark/network/netty/FileClient.java  |    2 -
 .../apache/spark/network/netty/FileServer.java  |    1 -
 .../org/apache/spark/MapOutputTracker.scala     |   29 +-
 .../scala/org/apache/spark/Partitioner.scala    |    8 +-
 .../scala/org/apache/spark/SparkContext.scala   |   77 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   13 +-
 .../main/scala/org/apache/spark/TaskState.scala |    3 +-
 .../apache/spark/api/java/JavaDoubleRDD.scala   |    9 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |   77 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |    7 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala |   32 +-
 .../spark/api/java/JavaSparkContext.scala       |   60 +-
 .../java/JavaSparkContextVarargsWorkaround.java |    1 -
 .../api/java/function/FlatMapFunction.scala     |    4 +-
 .../api/java/function/FlatMapFunction2.scala    |    4 +-
 .../spark/api/java/function/Function.java       |    8 +-
 .../spark/api/java/function/Function2.java      |    8 +-
 .../spark/api/java/function/Function3.java      |    8 +-
 .../api/java/function/PairFlatMapFunction.java  |   12 +-
 .../spark/api/java/function/PairFunction.java   |   12 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   10 +-
 .../spark/api/python/PythonWorkerFactory.scala  |    4 +-
 .../org/apache/spark/deploy/ExecutorState.scala |    3 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |   13 +-
 .../org/apache/spark/deploy/client/Client.scala |   48 +-
 .../spark/deploy/master/ApplicationState.scala  |    3 +-
 .../master/FileSystemPersistenceEngine.scala    |    6 +-
 .../org/apache/spark/deploy/master/Master.scala |   69 +-
 .../spark/deploy/master/RecoveryState.scala     |    4 +-
 .../spark/deploy/master/WorkerState.scala       |    4 +-
 .../master/ZooKeeperPersistenceEngine.scala     |    6 +-
 .../deploy/master/ui/ApplicationPage.scala      |    5 +-
 .../spark/deploy/master/ui/IndexPage.scala      |    4 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |    2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   48 +-
 .../spark/deploy/worker/ui/IndexPage.scala      |    5 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   13 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   23 +-
 .../org/apache/spark/executor/Executor.scala    |    2 +-
 .../spark/network/ConnectionManager.scala       |    8 +-
 .../spark/network/ConnectionManagerTest.scala   |    4 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |    3 +-
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |    4 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |    3 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   14 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     |    3 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |    5 +-
 .../scala/org/apache/spark/rdd/EmptyRDD.scala   |    5 +-
 .../org/apache/spark/rdd/FilteredRDD.scala      |    3 +-
 .../org/apache/spark/rdd/FlatMappedRDD.scala    |    3 +-
 .../scala/org/apache/spark/rdd/GlommedRDD.scala |    3 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |    4 +-
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |    4 +-
 .../scala/org/apache/spark/rdd/MappedRDD.scala  |    4 +-
 .../apache/spark/rdd/OrderedRDDFunctions.scala  |   10 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   33 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |    8 +-
 .../apache/spark/rdd/PartitionPruningRDD.scala  |    6 +-
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |    3 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   64 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |    4 +-
 .../scala/org/apache/spark/rdd/SampledRDD.scala |    5 +-
 .../spark/rdd/SequenceFileRDDFunctions.scala    |   11 +-
 .../org/apache/spark/rdd/ShuffledRDD.scala      |    6 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |    5 +-
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |    7 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |    9 +-
 .../scala/org/apache/spark/rdd/ZippedRDD.scala  |    6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   13 +-
 .../apache/spark/scheduler/SchedulingMode.scala |    2 +-
 .../apache/spark/scheduler/TaskLocality.scala   |    4 +-
 .../scheduler/cluster/ClusterScheduler.scala    |    5 +-
 .../cluster/ClusterTaskSetManager.scala         |    6 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   23 +-
 .../cluster/SimrSchedulerBackend.scala          |    2 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |    2 +-
 .../scheduler/cluster/TaskResultGetter.scala    |    4 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |    2 +-
 .../org/apache/spark/storage/BlockManager.scala |    7 +-
 .../spark/storage/BlockManagerMaster.scala      |   16 +-
 .../spark/storage/BlockManagerMasterActor.scala |    7 +-
 .../apache/spark/storage/ThreadingTest.scala    |    2 +-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |    2 +-
 .../spark/ui/storage/BlockManagerUI.scala       |    2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |   79 +-
 .../spark/util/BoundedPriorityQueue.scala       |    2 +
 .../spark/util/IndestructibleActorSystem.scala  |   68 +
 .../org/apache/spark/util/MetadataCleaner.scala |    3 +-
 .../apache/spark/util/TimeStampedHashMap.scala  |    2 +-
 .../scala/org/apache/spark/util/Utils.scala     |    5 +-
 .../spark/util/collection/OpenHashMap.scala     |    3 +-
 .../spark/util/collection/OpenHashSet.scala     |   11 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |    7 +-
 .../spark/util/collection/PrimitiveVector.scala |    4 +-
 .../org/apache/spark/AccumulatorSuite.scala     |   32 +-
 .../org/apache/spark/CheckpointSuite.scala      |    5 +-
 .../org/apache/spark/DistributedSuite.scala     |    5 +-
 .../scala/org/apache/spark/DriverSuite.scala    |    2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |   14 +-
 .../scala/org/apache/spark/UnpersistSuite.scala |    2 +-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |    8 +-
 .../spark/scheduler/SparkListenerSuite.scala    |    2 +-
 .../cluster/ClusterTaskSetManagerSuite.scala    |    2 +-
 .../cluster/TaskResultGetterSuite.scala         |    4 +-
 .../org/apache/spark/storage/BlockIdSuite.scala |    2 +-
 .../spark/storage/BlockManagerSuite.scala       |    2 +-
 .../scala/org/apache/spark/ui/UISuite.scala     |    1 -
 .../apache/spark/util/SizeEstimatorSuite.scala  |   72 +-
 docs/_config.yml                                |    2 +-
 docs/_plugins/copy_api_dirs.rb                  |    2 +-
 docs/configuration.md                           |   23 +-
 docs/running-on-yarn.md                         |   12 +-
 docs/scala-programming-guide.md                 |    2 +-
 ec2/spark_ec2.py                                |    4 +-
 examples/pom.xml                                |   26 +-
 .../org/apache/spark/examples/JavaLogQuery.java |    2 +-
 .../org/apache/spark/examples/JavaPageRank.java |    3 +-
 .../apache/spark/examples/JavaWordCount.java    |    2 +-
 .../apache/spark/mllib/examples/JavaALS.java    |    1 -
 .../streaming/examples/ActorWordCount.scala     |    7 +-
 .../streaming/examples/ZeroMQWordCount.scala    |    8 +-
 mllib/pom.xml                                   |   12 +-
 .../spark/mllib/util/MFDataGenerator.scala      |    2 +-
 .../spark/mllib/clustering/JavaKMeansSuite.java |    4 +-
 .../mllib/recommendation/JavaALSSuite.java      |    2 -
 new-yarn/pom.xml                                |    6 +-
 .../spark/deploy/yarn/ClientArguments.scala     |    1 +
 .../spark/deploy/yarn/WorkerLauncher.scala      |   13 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |    2 +-
 pom.xml                                         |  179 +-
 project/SparkBuild.scala                        |  135 +-
 pyspark                                         |   10 +-
 pyspark2.cmd                                    |    2 +-
 python/pyspark/rdd.py                           |    4 +-
 repl-bin/pom.xml                                |    8 +-
 repl-bin/src/deb/bin/run                        |    2 +-
 repl/lib/scala-jline.jar                        |  Bin 158463 -> 0 bytes
 repl/pom.xml                                    |   18 +-
 .../main/scala/org/apache/spark/repl/Main.scala |    8 +-
 .../org/apache/spark/repl/SparkExprTyper.scala  |  109 ++
 .../org/apache/spark/repl/SparkILoop.scala      |  944 +++++-----
 .../org/apache/spark/repl/SparkILoopInit.scala  |  143 ++
 .../org/apache/spark/repl/SparkIMain.scala      | 1681 ++++++++++--------
 .../org/apache/spark/repl/SparkISettings.scala  |   63 -
 .../org/apache/spark/repl/SparkImports.scala    |  108 +-
 .../spark/repl/SparkJLineCompletion.scala       |  206 ++-
 .../apache/spark/repl/SparkJLineReader.scala    |   65 +-
 .../apache/spark/repl/SparkMemberHandlers.scala |  109 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala |  178 +-
 run-example                                     |    2 +-
 run-example2.cmd                                |    2 +-
 spark-class                                     |    6 +-
 streaming/pom.xml                               |   22 +-
 .../org/apache/spark/streaming/DStream.scala    |   37 +-
 .../spark/streaming/DStreamCheckpointData.scala |    6 +-
 .../spark/streaming/PairDStreamFunctions.scala  |   63 +-
 .../spark/streaming/StreamingContext.scala      |   45 +-
 .../spark/streaming/api/java/JavaDStream.scala  |    8 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   79 +-
 .../streaming/api/java/JavaPairDStream.scala    |   93 +-
 .../api/java/JavaStreamingContext.scala         |  108 +-
 .../dstream/ConstantInputDStream.scala          |    3 +-
 .../streaming/dstream/FileInputDStream.scala    |   12 +-
 .../streaming/dstream/FilteredDStream.scala     |    3 +-
 .../dstream/FlatMapValuedDStream.scala          |    3 +-
 .../streaming/dstream/FlatMappedDStream.scala   |    3 +-
 .../streaming/dstream/FlumeInputDStream.scala   |    3 +-
 .../streaming/dstream/ForEachDStream.scala      |    3 +-
 .../streaming/dstream/GlommedDStream.scala      |    3 +-
 .../spark/streaming/dstream/InputDStream.scala  |    4 +-
 .../streaming/dstream/KafkaInputDStream.scala   |   23 +-
 .../streaming/dstream/MQTTInputDStream.scala    |    3 +-
 .../dstream/MapPartitionedDStream.scala         |    3 +-
 .../streaming/dstream/MapValuedDStream.scala    |    3 +-
 .../spark/streaming/dstream/MappedDStream.scala |    3 +-
 .../streaming/dstream/NetworkInputDStream.scala |   13 +-
 .../dstream/PluggableInputDStream.scala         |    3 +-
 .../streaming/dstream/QueueInputDStream.scala   |    4 +-
 .../streaming/dstream/RawInputDStream.scala     |    4 +-
 .../dstream/ReducedWindowedDStream.scala        |    9 +-
 .../streaming/dstream/ShuffledDStream.scala     |    3 +-
 .../streaming/dstream/SocketInputDStream.scala  |    6 +-
 .../spark/streaming/dstream/StateDStream.scala  |    4 +-
 .../streaming/dstream/TransformedDStream.scala  |    3 +-
 .../spark/streaming/dstream/UnionDStream.scala  |    5 +-
 .../streaming/dstream/WindowedDStream.scala     |    7 +-
 .../streaming/receivers/ActorReceiver.scala     |   35 +-
 .../streaming/receivers/ZeroMQReceiver.scala    |   13 +-
 .../scheduler/NetworkInputTracker.scala         |    2 +-
 .../streaming/util/MasterFailureTest.scala      |   45 +-
 .../apache/spark/streaming/JavaAPISuite.java    |   88 +-
 .../apache/spark/streaming/JavaTestUtils.scala  |   22 +-
 .../spark/streaming/CheckpointSuite.scala       |   36 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   29 +-
 tools/pom.xml                                   |   12 +-
 .../tools/JavaAPICompletenessChecker.scala      |    4 +-
 yarn/pom.xml                                    |   10 +-
 .../spark/deploy/yarn/ClientArguments.scala     |    1 +
 .../spark/deploy/yarn/WorkerLauncher.scala      |   17 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |    4 +-
 207 files changed, 3497 insertions(+), 2945 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 8001c49,329d2b5..a78d396
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@@ -23,9 -26,10 +23,10 @@@ import org.apache.spark.streaming.sched
  import org.apache.spark.Logging
  import org.apache.spark.rdd.RDD
  import org.apache.spark.storage.StorageLevel
 +import org.apache.spark.util.MetadataCleaner
  
 -import scala.collection.mutable.ArrayBuffer
  import scala.collection.mutable.HashMap
+ import scala.reflect.ClassTag
  
  import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 83f1cad,d2c4fde..fedbbde
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@@ -46,9 -47,9 +47,9 @@@ import org.apache.hadoop.mapreduce.lib.
  import org.apache.hadoop.fs.Path
  import twitter4j.Status
  import twitter4j.auth.Authorization
 +import org.apache.spark.streaming.scheduler._
- 
+ import akka.util.ByteString
  
 -
  /**
   * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
   * information (such as, cluster URL and job name) to internally create a SparkContext, it provides

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 0072248,98b14cb..364abcd
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@@ -18,11 -18,11 +18,12 @@@
  package org.apache.spark.streaming.dstream
  
  import org.apache.spark.rdd.RDD
 -import org.apache.spark.streaming.{Duration, DStream, Job, Time}
 +import org.apache.spark.streaming.{Duration, DStream, Time}
 +import org.apache.spark.streaming.scheduler.Job
+ import scala.reflect.ClassTag
  
  private[streaming]
- class ForEachDStream[T: ClassManifest] (
+ class ForEachDStream[T: ClassTag] (
      parent: DStream[T],
      foreachFunc: (RDD[T], Time) => Unit
    ) extends DStream[Unit](parent.ssc) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index c759302,0000000..abff55d
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@@ -1,175 -1,0 +1,175 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.scheduler
 +
 +import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
 +import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
 +import org.apache.spark.Logging
 +import org.apache.spark.SparkEnv
 +import org.apache.spark.SparkContext._
 +
 +import scala.collection.mutable.HashMap
 +import scala.collection.mutable.Queue
++import scala.concurrent.duration._
 +
 +import akka.actor._
 +import akka.pattern.ask
- import akka.util.duration._
 +import akka.dispatch._
 +import org.apache.spark.storage.BlockId
 +import org.apache.spark.streaming.{Time, StreamingContext}
 +
 +private[streaming] sealed trait NetworkInputTrackerMessage
 +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
 +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
 +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
 +
 +/**
 + * This class manages the execution of the receivers of NetworkInputDStreams.
 + */
 +private[streaming]
 +class NetworkInputTracker(
 +    @transient ssc: StreamingContext,
 +    @transient networkInputStreams: Array[NetworkInputDStream[_]])
 +  extends Logging {
 +
 +  val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
 +  val receiverExecutor = new ReceiverExecutor()
 +  val receiverInfo = new HashMap[Int, ActorRef]
 +  val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
 +  val timeout = 5000.milliseconds
 +
 +  var currentTime: Time = null
 +
 +  /** Start the actor and receiver execution thread. */
 +  def start() {
 +    ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
 +    receiverExecutor.start()
 +  }
 +
 +  /** Stop the receiver execution thread. */
 +  def stop() {
 +    // TODO: stop the actor as well
 +    receiverExecutor.interrupt()
 +    receiverExecutor.stopReceivers()
 +  }
 +
 +  /** Return all the blocks received from a receiver. */
 +  def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
 +    val queue =  receivedBlockIds.synchronized {
 +      receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
 +    }
 +    val result = queue.synchronized {
 +      queue.dequeueAll(x => true)
 +    }
 +    logInfo("Stream " + receiverId + " received " + result.size + " blocks")
 +    result.toArray
 +  }
 +
 +  /** Actor to receive messages from the receivers. */
 +  private class NetworkInputTrackerActor extends Actor {
 +    def receive = {
 +      case RegisterReceiver(streamId, receiverActor) => {
 +        if (!networkInputStreamMap.contains(streamId)) {
 +          throw new Exception("Register received for unexpected id " + streamId)
 +        }
 +        receiverInfo += ((streamId, receiverActor))
 +        logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
 +        sender ! true
 +      }
 +      case AddBlocks(streamId, blockIds, metadata) => {
 +        val tmp = receivedBlockIds.synchronized {
 +          if (!receivedBlockIds.contains(streamId)) {
 +            receivedBlockIds += ((streamId, new Queue[BlockId]))
 +          }
 +          receivedBlockIds(streamId)
 +        }
 +        tmp.synchronized {
 +          tmp ++= blockIds
 +        }
 +        networkInputStreamMap(streamId).addMetadata(metadata)
 +      }
 +      case DeregisterReceiver(streamId, msg) => {
 +        receiverInfo -= streamId
 +        logError("De-registered receiver for network stream " + streamId
 +          + " with message " + msg)
 +        //TODO: Do something about the corresponding NetworkInputDStream
 +      }
 +    }
 +  }
 +
 +  /** This thread class runs all the receivers on the cluster.  */
 +  class ReceiverExecutor extends Thread {
 +    val env = ssc.env
 +
 +    override def run() {
 +      try {
 +        SparkEnv.set(env)
 +        startReceivers()
 +      } catch {
 +        case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
 +      } finally {
 +        stopReceivers()
 +      }
 +    }
 +
 +    /**
 +     * Get the receivers from the NetworkInputDStreams, distributes them to the
 +     * worker nodes as a parallel collection, and runs them.
 +     */
 +    def startReceivers() {
 +      val receivers = networkInputStreams.map(nis => {
 +        val rcvr = nis.getReceiver()
 +        rcvr.setStreamId(nis.id)
 +        rcvr
 +      })
 +
 +      // Right now, we only honor preferences if all receivers have them
 +      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
 +
 +      // Create the parallel collection of receivers to distributed them on the worker nodes
 +      val tempRDD =
 +        if (hasLocationPreferences) {
 +          val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
 +          ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
 +        }
 +        else {
 +          ssc.sc.makeRDD(receivers, receivers.size)
 +        }
 +
 +      // Function to start the receiver on the worker node
 +      val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
 +        if (!iterator.hasNext) {
 +          throw new Exception("Could not start receiver as details not found.")
 +        }
 +        iterator.next().start()
 +      }
 +      // Run the dummy Spark job to ensure that all slaves have registered.
 +      // This avoids all the receivers to be scheduled on the same node.
 +      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
 +
 +      // Distribute the receivers and start them
 +      ssc.sparkContext.runJob(tempRDD, startReceiver)
 +    }
 +
 +    /** Stops the receivers. */
 +    def stopReceivers() {
 +      // Signal the receivers to stop
 +      receiverInfo.values.foreach(_ ! StopReceiver)
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e93b391d/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------


[4/9] git commit: Minor changes.

Posted by ma...@apache.org.
Minor changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ec71b445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ec71b445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ec71b445

Branch: refs/heads/master
Commit: ec71b445ad0440e84c4b4909e4faf75aba0f13d7
Parents: e93b391
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 18 23:39:28 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 18 23:39:28 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListenerBus.scala  |  1 -
 .../apache/spark/streaming/StreamingContext.scala  | 10 ++++++----
 .../streaming/api/java/JavaStreamingContext.scala  |  8 ++++++++
 .../spark/streaming/scheduler/BatchInfo.scala      |  7 +++----
 .../org/apache/spark/streaming/scheduler/Job.scala | 14 ++++----------
 .../spark/streaming/scheduler/JobGenerator.scala   |  4 ++++
 .../spark/streaming/scheduler/JobScheduler.scala   |  4 +++-
 .../streaming/scheduler/StreamingListener.scala    | 17 ++++++-----------
 .../streaming/scheduler/StreamingListenerBus.scala |  2 +-
 .../spark/streaming/StreamingListenerSuite.scala   |  2 +-
 10 files changed, 36 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d5824e7..85687ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
     return true
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index fedbbde..41da028 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -513,7 +513,10 @@ class StreamingContext private (
     graph.addOutputStream(outputStream)
   }
 
-  def addListener(streamingListener: StreamingListener) {
+  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+    * receiving system events related to streaming.
+    */
+  def addStreamingListener(streamingListener: StreamingListener) {
     scheduler.listenerBus.addListener(streamingListener)
   }
 
@@ -532,20 +535,19 @@ class StreamingContext private (
    * Start the execution of the streams.
    */
   def start() {
-
     validate()
 
+    // Get the network input streams
     val networkInputStreams = graph.getInputStreams().filter(s => s match {
         case n: NetworkInputDStream[_] => true
         case _ => false
       }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
 
+    // Start the network input tracker (must start before receivers)
     if (networkInputStreams.length > 0) {
-      // Start the network input tracker (must start before receivers)
       networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
       networkInputTracker.start()
     }
-
     Thread.sleep(1000)
 
     // Start the scheduler

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87..78d318c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
 import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     ssc.remember(duration)
   }
 
+  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+    * receiving system events related to streaming.
+    */
+  def addStreamingListener(streamingListener: StreamingListener) {
+    ssc.addStreamingListener(streamingListener)
+  }
+
   /**
    * Starts the execution of the streams.
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 798598a..88e4af5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -19,6 +19,9 @@ package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.streaming.Time
 
+/**
+ * Class having information on completed batches.
+ */
 case class BatchInfo(
     batchTime: Time,
     submissionTime: Long,
@@ -32,7 +35,3 @@ case class BatchInfo(
 
   def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
 }
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index bca5e1f..7341bfb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.atomic.AtomicLong
 import org.apache.spark.streaming.Time
 
+/**
+ * Class representing a Spark computation. It may contain multiple Spark jobs.
+ */
 private[streaming]
 class Job(val time: Time, func: () => _) {
   var id: String = _
@@ -36,12 +38,4 @@ class Job(val time: Time, func: () => _) {
   }
 
   override def toString = id
-}
-/*
-private[streaming]
-object Job {
-  val id = new AtomicLong(0)
-
-  def getNewId() = id.getAndIncrement()
-}
-*/
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 5d3ce9c..1cd0b9b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -22,6 +22,10 @@ import org.apache.spark.Logging
 import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
 import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 
+/**
+ * This class generates jobs from DStreams as well as drives checkpointing and cleaning
+ * up DStream metadata.
+ */
 private[streaming]
 class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 69930f3..33c5322 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -23,7 +23,9 @@ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
 import scala.collection.mutable.HashSet
 import org.apache.spark.streaming._
 
-
+/**
+ * This class drives the generation of Spark jobs from the DStreams.
+ */
 private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 5647ffa..36225e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,19 +50,13 @@ trait StreamingListener {
  * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
  */
 class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
-
-  import org.apache.spark
-
+  // Queue containing latest completed batches
   val batchInfos = new Queue[BatchInfo]()
 
   override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
-    addToQueue(batchStarted.batchInfo)
-    printStats()
-  }
-
-  def addToQueue(newPoint: BatchInfo) {
-    batchInfos.enqueue(newPoint)
+    batchInfos.enqueue(batchStarted.batchInfo)
     if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+    printStats()
   }
 
   def printStats() {
@@ -71,10 +65,11 @@ class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
   }
 
   def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
-    spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
+    org.apache.spark.scheduler.StatsReportListener.showMillisDistribution(
+      heading, extractDistribution(getMetric))
   }
 
   def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
     Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 324e491..110a20f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -78,4 +78,4 @@ private[spark] class StreamingListenerBus() extends Logging {
     }
     return true
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec71b445/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 826c839..16410a2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -34,7 +34,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
   test("basic BatchInfo generation") {
     val ssc = setupStreams(input, operation)
     val collector = new BatchInfoCollector
-    ssc.addListener(collector)
+    ssc.addStreamingListener(collector)
     runStreams(ssc, input.size, input.size)
     val batchInfos = collector.batchInfos
     batchInfos should have size 4


[5/9] git commit: Minor updated based on comments on PR 277.

Posted by ma...@apache.org.
Minor updated based on comments on PR 277.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3ddbdbfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3ddbdbfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3ddbdbfb

Branch: refs/heads/master
Commit: 3ddbdbfbc71486cd5076d875f82796a880d2dccb
Parents: ec71b44
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Dec 20 19:51:37 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 20 19:51:37 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/JobScheduler.scala      | 4 +++-
 .../main/scala/org/apache/spark/streaming/scheduler/JobSet.scala | 3 +++
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3ddbdbfb/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 33c5322..9511ccf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable.HashSet
 import org.apache.spark.streaming._
 
 /**
- * This class drives the generation of Spark jobs from the DStreams.
+ * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
+ * the jobs and runs them using a thread pool. Number of threads 
  */
 private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
@@ -91,6 +92,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     }
   }
 
+  private[streaming]
   class JobHandler(job: Job) extends Runnable {
     def run() {
       beforeJobStart(job)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3ddbdbfb/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 05233d0..cf7431a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -20,6 +20,9 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable.HashSet
 import org.apache.spark.streaming.Time
 
+/** Class representing a set of Jobs
+  * belong to the same batch.
+  */
 private[streaming]
 case class JobSet(time: Time, jobs: Seq[Job]) {
 


[6/9] git commit: Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277.

Posted by ma...@apache.org.
Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dc3ee6b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dc3ee6b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dc3ee6b6

Branch: refs/heads/master
Commit: dc3ee6b6122229cd99a133baf10a46dac2f7e9e2
Parents: 3ddbdbf
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 23 11:30:42 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 23 11:30:42 2013 -0800

----------------------------------------------------------------------
 .../spark/streaming/scheduler/BatchInfo.scala    | 19 +++++++++++++++++++
 .../spark/streaming/scheduler/JobSet.scala       | 10 +++++++---
 2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dc3ee6b6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 88e4af5..e3fb076 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time
 
 /**
  * Class having information on completed batches.
+ * @param batchTime   Time of the batch
+ * @param submissionTime  Clock time of when jobs of this batch was submitted to
+ *                        the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
+ *
  */
 case class BatchInfo(
     batchTime: Time,
@@ -29,9 +35,22 @@ case class BatchInfo(
     processingEndTime: Option[Long]
   ) {
 
+  /**
+   * Time taken for the first job of this batch to start processing from the time this batch
+   * was submitted to the streaming scheduler. Essentially, it is
+   * `processingStartTime` - `submissionTime`.
+   */
   def schedulingDelay = processingStartTime.map(_ - submissionTime)
 
+  /**
+   * Time taken for the all jobs of this batch to finish processing from the time they started
+   * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+   */
   def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
 
+    /**
+     * Time taken for all the jobs of this batch to finish processing from the time they
+     * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
+     */
   def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dc3ee6b6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index cf7431a..5726867 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -27,9 +27,9 @@ private[streaming]
 case class JobSet(time: Time, jobs: Seq[Job]) {
 
   private val incompleteJobs = new HashSet[Job]()
-  var submissionTime = System.currentTimeMillis()
-  var processingStartTime = -1L
-  var processingEndTime = -1L
+  var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+  var processingStartTime = -1L // when the first job of this jobset started processing
+  var processingEndTime = -1L // when the last job of this jobset finished processing
 
   jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
   incompleteJobs ++= jobs
@@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
 
   def hasCompleted() = incompleteJobs.isEmpty
 
+  // Time taken to process all the jobs from the time they started processing
+  // (i.e. not including the time they wait in the streaming scheduler queue)
   def processingDelay = processingEndTime - processingStartTime
 
+  // Time taken to process all the jobs from the time they were submitted
+  // (i.e. including the time they wait in the streaming scheduler queue)
   def totalDelay = {
     processingEndTime - time.milliseconds
   }


[8/9] git commit: Minor change for PR 277.

Posted by ma...@apache.org.
Minor change for PR 277.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6eaa0505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6eaa0505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6eaa0505

Branch: refs/heads/master
Commit: 6eaa0505493511adb040257abc749fcd774bbb68
Parents: f977169
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 23 15:55:45 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 23 15:55:45 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/StreamingListenerSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6eaa0505/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 16410a2..fa64142 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.scheduler._
 import scala.collection.mutable.ArrayBuffer
 import org.scalatest.matchers.ShouldMatchers
 
-class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
 
   val input = (1 to 4).map(Seq(_)).toSeq
   val operation = (d: DStream[Int]) => d.map(x => x)


[9/9] git commit: Merge pull request #277 from tdas/scheduler-update

Posted by ma...@apache.org.
Merge pull request #277 from tdas/scheduler-update

Refactored the streaming scheduler and added StreamingListener interface

- Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package.
- Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it.
- Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/23a9ae6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/23a9ae6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/23a9ae6b

Branch: refs/heads/master
Commit: 23a9ae6be33ab2721005b766615660e07eec739f
Parents: 11107c9 6eaa050
Author: Matei Zaharia <ma...@databricks.com>
Authored: Tue Dec 24 00:08:48 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Dec 24 00:08:48 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |   7 +-
 .../spark/scheduler/SparkListenerBus.scala      |   1 -
 .../org/apache/spark/streaming/Checkpoint.scala |   2 +-
 .../org/apache/spark/streaming/DStream.scala    |  11 +-
 .../apache/spark/streaming/DStreamGraph.scala   |   1 +
 .../scala/org/apache/spark/streaming/Job.scala  |  41 -----
 .../org/apache/spark/streaming/JobManager.scala |  88 ----------
 .../spark/streaming/NetworkInputTracker.scala   | 174 ------------------
 .../org/apache/spark/streaming/Scheduler.scala  | 131 --------------
 .../spark/streaming/StreamingContext.scala      |  26 +--
 .../api/java/JavaStreamingContext.scala         |   8 +
 .../streaming/dstream/ForEachDStream.scala      |   3 +-
 .../streaming/dstream/NetworkInputDStream.scala |   1 +
 .../spark/streaming/scheduler/BatchInfo.scala   |  55 ++++++
 .../apache/spark/streaming/scheduler/Job.scala  |  41 +++++
 .../streaming/scheduler/JobGenerator.scala      | 131 ++++++++++++++
 .../streaming/scheduler/JobScheduler.scala      | 108 ++++++++++++
 .../spark/streaming/scheduler/JobSet.scala      |  68 +++++++
 .../scheduler/NetworkInputTracker.scala         | 175 +++++++++++++++++++
 .../streaming/scheduler/StreamingListener.scala |  75 ++++++++
 .../scheduler/StreamingListenerBus.scala        |  81 +++++++++
 .../spark/streaming/BasicOperationsSuite.scala  |  12 --
 .../spark/streaming/CheckpointSuite.scala       |  26 ++-
 .../apache/spark/streaming/FailureSuite.scala   |  13 +-
 .../spark/streaming/InputStreamsSuite.scala     |  12 --
 .../streaming/StreamingListenerSuite.scala      |  71 ++++++++
 .../apache/spark/streaming/TestSuiteBase.scala  |  32 +++-
 .../spark/streaming/WindowOperationsSuite.scala |  14 +-
 28 files changed, 889 insertions(+), 519 deletions(-)
----------------------------------------------------------------------



[7/9] git commit: Minor formatting fixes.

Posted by ma...@apache.org.
Minor formatting fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f9771690
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f9771690
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f9771690

Branch: refs/heads/master
Commit: f9771690a698b6ce5d29eb36b38bbeb498d1af0d
Parents: dc3ee6b
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 23 11:32:26 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 23 11:32:26 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/BatchInfo.scala    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f9771690/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index e3fb076..4e8d07f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -26,7 +26,6 @@ import org.apache.spark.streaming.Time
  *                        the streaming scheduler queue
  * @param processingStartTime Clock time of when the first job of this batch started processing
  * @param processingEndTime Clock time of when the last job of this batch finished processing
- *
  */
 case class BatchInfo(
     batchTime: Time,
@@ -48,9 +47,9 @@ case class BatchInfo(
    */
   def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
 
-    /**
-     * Time taken for all the jobs of this batch to finish processing from the time they
-     * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
-     */
+  /**
+   * Time taken for all the jobs of this batch to finish processing from the time they
+   * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
+   */
   def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
 }