You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/20 05:48:39 UTC

spark git commit: [SPARK-6979][Streaming] Replace JobScheduler.eventActor and JobGenerator.eventActor with EventLoop

Repository: spark
Updated Branches:
  refs/heads/master d8e1b7b06 -> c776ee8a6


[SPARK-6979][Streaming] Replace JobScheduler.eventActor and JobGenerator.eventActor with EventLoop

Title says it all.

cc rxin tdas

Author: zsxwing <zs...@gmail.com>

Closes #5554 from zsxwing/SPARK-6979 and squashes the following commits:

5304350 [zsxwing] Fix NotSerializableException
e9d3479 [zsxwing] Add blank lines
633e279 [zsxwing] Fix NotSerializableException
e496ace [zsxwing] Replace JobGenerator.eventActor with EventLoop
ec6ec58 [zsxwing] Fix the import order
ce0fa73 [zsxwing] Replace JobScheduler.eventActor with EventLoop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c776ee8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c776ee8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c776ee8a

Branch: refs/heads/master
Commit: c776ee8a6fdcdc463746a815b7686e4e33a874a9
Parents: d8e1b7b
Author: zsxwing <zs...@gmail.com>
Authored: Sun Apr 19 20:48:36 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Apr 19 20:48:36 2015 -0700

----------------------------------------------------------------------
 .../mllib/clustering/StreamingKMeans.scala      |  3 +-
 .../streaming/scheduler/JobGenerator.scala      | 38 ++++++++++---------
 .../streaming/scheduler/JobScheduler.scala      | 40 ++++++++++----------
 3 files changed, 42 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c776ee8a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index d4606fd..812014a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -20,8 +20,7 @@ package org.apache.spark.mllib.clustering
 import scala.reflect.ClassTag
 
 import org.apache.spark.Logging
-import org.apache.spark.SparkContext._
-import org.apache.spark.annotation.{Experimental, DeveloperApi}
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.dstream.DStream

http://git-wip-us.apache.org/repos/asf/spark/blob/c776ee8a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 58e5663..2467d50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -19,12 +19,10 @@ package org.apache.spark.streaming.scheduler
 
 import scala.util.{Failure, Success, Try}
 
-import akka.actor.{ActorRef, Props, Actor}
-
 import org.apache.spark.{SparkEnv, Logging}
 import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
 import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, EventLoop, ManualClock}
 
 /** Event classes for JobGenerator */
 private[scheduler] sealed trait JobGeneratorEvent
@@ -58,7 +56,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
   }
 
   private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
-    longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
+    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
 
   // This is marked lazy so that this is initialized after checkpoint duration has been set
   // in the context and the generator has been started.
@@ -70,22 +68,26 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     null
   }
 
-  // eventActor is created when generator starts.
+  // eventLoop is created when generator starts.
   // This not being null means the scheduler has been started and not stopped
-  private var eventActor: ActorRef = null
+  private var eventLoop: EventLoop[JobGeneratorEvent] = null
 
   // last batch whose completion,checkpointing and metadata cleanup has been completed
   private var lastProcessedBatch: Time = null
 
   /** Start generation of jobs */
   def start(): Unit = synchronized {
-    if (eventActor != null) return // generator has already been started
+    if (eventLoop != null) return // generator has already been started
+
+    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
+      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
 
-    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-      override def receive: PartialFunction[Any, Unit] = {
-        case event: JobGeneratorEvent =>  processEvent(event)
+      override protected def onError(e: Throwable): Unit = {
+        jobScheduler.reportError("Error in job generator", e)
       }
-    }), "JobGenerator")
+    }
+    eventLoop.start()
+
     if (ssc.isCheckpointPresent) {
       restart()
     } else {
@@ -99,7 +101,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
    * checkpoints written.
    */
   def stop(processReceivedData: Boolean): Unit = synchronized {
-    if (eventActor == null) return // generator has already been stopped
+    if (eventLoop == null) return // generator has already been stopped
 
     if (processReceivedData) {
       logInfo("Stopping JobGenerator gracefully")
@@ -146,9 +148,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
       graph.stop()
     }
 
-    // Stop the actor and checkpoint writer
+    // Stop the event loop and checkpoint writer
     if (shouldCheckpoint) checkpointWriter.stop()
-    ssc.env.actorSystem.stop(eventActor)
+    eventLoop.stop()
     logInfo("Stopped JobGenerator")
   }
 
@@ -156,7 +158,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
    * Callback called when a batch has been completely processed.
    */
   def onBatchCompletion(time: Time) {
-    eventActor ! ClearMetadata(time)
+    eventLoop.post(ClearMetadata(time))
   }
 
   /**
@@ -164,7 +166,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
    */
   def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
     if (clearCheckpointDataLater) {
-      eventActor ! ClearCheckpointData(time)
+      eventLoop.post(ClearCheckpointData(time))
     }
   }
 
@@ -247,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
     }
-    eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
+    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
   }
 
   /** Clear DStream metadata for the given `time`. */
@@ -257,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     // If checkpointing is enabled, then checkpoint,
     // else mark batch to be fully processed
     if (shouldCheckpoint) {
-      eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
+      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
     } else {
       // If checkpointing is not enabled, then delete metadata information about
       // received blocks (block data not saved in any case). Otherwise, wait for

http://git-wip-us.apache.org/repos/asf/spark/blob/c776ee8a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 95f1857..508b892 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark.streaming.scheduler
 
-import scala.util.{Failure, Success, Try}
-import scala.collection.JavaConversions._
 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
-import akka.actor.{ActorRef, Actor, Props}
-import org.apache.spark.{SparkException, Logging, SparkEnv}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success}
+
+import org.apache.spark.Logging
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
+import org.apache.spark.util.EventLoop
 
 
 private[scheduler] sealed trait JobSchedulerEvent
@@ -46,20 +48,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   val listenerBus = new StreamingListenerBus()
 
   // These two are created only when scheduler starts.
-  // eventActor not being null means the scheduler has been started and not stopped
+  // eventLoop not being null means the scheduler has been started and not stopped
   var receiverTracker: ReceiverTracker = null
-  private var eventActor: ActorRef = null
-
+  private var eventLoop: EventLoop[JobSchedulerEvent] = null
 
   def start(): Unit = synchronized {
-    if (eventActor != null) return // scheduler has already been started
+    if (eventLoop != null) return // scheduler has already been started
 
     logDebug("Starting JobScheduler")
-    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-      override def receive: PartialFunction[Any, Unit] = {
-        case event: JobSchedulerEvent => processEvent(event)
-      }
-    }), "JobScheduler")
+    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
+      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
+
+      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
+    }
+    eventLoop.start()
 
     listenerBus.start(ssc.sparkContext)
     receiverTracker = new ReceiverTracker(ssc)
@@ -69,7 +71,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   }
 
   def stop(processAllReceivedData: Boolean): Unit = synchronized {
-    if (eventActor == null) return // scheduler has already been stopped
+    if (eventLoop == null) return // scheduler has already been stopped
     logDebug("Stopping JobScheduler")
 
     // First, stop receiving
@@ -96,8 +98,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
 
     // Stop everything else
     listenerBus.stop()
-    ssc.env.actorSystem.stop(eventActor)
-    eventActor = null
+    eventLoop.stop()
+    eventLoop = null
     logInfo("Stopped JobScheduler")
   }
 
@@ -117,7 +119,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   }
 
   def reportError(msg: String, e: Throwable) {
-    eventActor ! ErrorReported(msg, e)
+    eventLoop.post(ErrorReported(msg, e))
   }
 
   private def processEvent(event: JobSchedulerEvent) {
@@ -172,14 +174,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   private class JobHandler(job: Job) extends Runnable {
     def run() {
-      eventActor ! JobStarted(job)
+      eventLoop.post(JobStarted(job))
       // Disable checks for existing output directories in jobs launched by the streaming scheduler,
       // since we may need to write output to an existing directory during checkpoint recovery;
       // see SPARK-4835 for more details.
       PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
         job.run()
       }
-      eventActor ! JobCompleted(job)
+      eventLoop.post(JobCompleted(job))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org