You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "necro351 ." <ne...@gmail.com> on 2015/03/17 20:31:15 UTC

graceful shutdown not so graceful?

Hi all,

I am trying to do a graceful shutdown of my spark streaming job and it
appears that everything shuts down gracefully but the checkpointing thread,
which continues to run until it crashes.

I looked at the checkpoint thread in 1.3.0 (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala)
and it appears the write method in CheckpointWriter will try to schedule a
new CheckpointWriteHandler (and get the below exception) irregardless of
the value of 'stopped', which would be set to 'true' as it was stopped by
the graceful shutdown. Is this a bug? Shouldn't the write method not try to
schedule anything if stopped is true?

Thanks!

This is what I'm doing:
=================
class TestStreaming extends FunSuite with BeforeAndAfterAll {
  @transient var sc: SparkContext = _
  @transient var ssc: StreamingContext = _

  override def beforeAll() = {
    System.clearProperty("spark.driver.port")
    System.clearProperty("spark.hostPort")
    System.setProperty("spark.cleaner.ttl", "300")
    val sparkConf = new
SparkConf().setAppName("testSpark").setMaster("local[4]")
    sc = new SparkContext(sparkConf)
    ssc = new StreamingContext(sc, Seconds(1))
  }

  override def afterAll() = {
    val stopSparkContext = true
    val stopGracefully = true
    ssc.stop(stopSparkContext, stopGracefully)
    sc = null
    ssc = null
    System.clearProperty("spark.driver.port")
    System.clearProperty("spark.hostPort")
  }

  test("testStreaming") {
    val rddQueue = new SynchronizedQueue[RDD[JValue]]()
    val inputStream = ssc.queueStream(rddQueue)
    rddQueue += ssc.sparkContext.makeRDD(TestInput.reports("disney"))

    val hydratedReports = ReportHydrator.hydrate(inputStream)

ApplicationPropertyGenerator.generateFrom(hydratedReports).foreachRDD(rdd
=> rdd.foreach(println(_)))

    ssc.checkpoint("reports/streaming")
    ssc.start()
  }
}

This is the output I get when shutting down gracefully (the exception is
half-way down):
=================================================
15/03/17 12:25:34 INFO Executor: Finished task 3.0 in stage 2.0 (TID 11).
1455 bytes result sent to driver
15/03/17 12:25:34 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID
11) in 3019 ms on localhost (4/4)
15/03/17 12:25:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool
15/03/17 12:25:34 INFO DAGScheduler: Stage 2 (foreachRDD at
TestStreaming.scala:42) finished in 3.027 s
15/03/17 12:25:34 INFO DAGScheduler: Job 0 finished: foreachRDD at
TestStreaming.scala:42, took 4.492961 s
15/03/17 12:25:34 INFO JobScheduler: Finished job streaming job
1426620330000 ms.0 from job set of time 1426620330000 ms
15/03/17 12:25:34 INFO JobScheduler: Total delay: 4.951 s for time
1426620330000 ms (execution: 4.532 s)
15/03/17 12:25:39 WARN JobGenerator: Timed out while stopping the job
generator (timeout = 10000)
15/03/17 12:25:39 INFO JobGenerator: Waited for jobs to be processed and
checkpoints to be written
15/03/17 12:25:39 INFO CheckpointWriter: CheckpointWriter executor
terminated ? true, waited for 0 ms.
15/03/17 12:25:39 INFO JobGenerator: Stopped JobGenerator
15/03/17 12:25:39 INFO JobGenerator: Checkpointing graph for time
1426620330000 ms
15/03/17 12:25:39 INFO DStreamGraph: Updating checkpoint data for time
1426620330000 ms
15/03/17 12:25:39 INFO DStreamGraph: Updated checkpoint data for time
1426620330000 ms
15/03/17 12:25:39 INFO JobScheduler: Stopped JobScheduler
15/03/17 12:25:39 INFO StreamingContext: StreamingContext stopped
successfully
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}
15/03/17 12:25:39 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs,null}
15/03/17 12:25:39 ERROR CheckpointWriter: Could not submit checkpoint task
to the thread pool executor
java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@5cd6e776
rejected from java.util.concurrent.ThreadPoolExecutor@20f62398[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:188)
at
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:285)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:176)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/03/17 12:25:39 INFO SparkUI: Stopped Spark web UI at
http://192.168.241.128:4040
15/03/17 12:25:39 INFO DAGScheduler: Stopping DAGScheduler
15/03/17 12:25:39 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
stopped!
15/03/17 12:25:39 INFO MemoryStore: MemoryStore cleared
15/03/17 12:25:39 INFO BlockManager: BlockManager stopped
15/03/17 12:25:39 INFO BlockManagerMaster: BlockManagerMaster stopped
15/03/17 12:25:39 INFO
OutputCommitCoordinator$OutputCommitCoordinatorActor:
OutputCommitCoordinator stopped!
15/03/17 12:25:39 INFO SparkContext: Successfully stopped SparkContext
15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
[info] TestStreaming:
[info] - testStreaming
[info] Run completed in 14 seconds, 369 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 17 s, completed Mar 17, 2015 12:25:39 PM
15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
>