You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jose Fernandez <jf...@sdl.com> on 2015/03/12 20:27:54 UTC

Handling worker batch processing during driver shutdown

Hi folks,

I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher.

Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have.

Thanks :)


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <www.sdl.com/innovate/sanfran>

SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.



This message has been scanned for malware by Websense. www.websense.com

RE: Handling worker batch processing during driver shutdown

Posted by Tathagata Das <td...@databricks.com>.
Are you running the code before or after closing the spark context? It must
be after stopping streaming context (without spark context) and before
stopping spark context.

Cc'ing Sean. He may have more insights.
On Mar 13, 2015 11:19 AM, "Jose Fernandez" <jf...@sdl.com> wrote:

>  Thanks, I am using 1.2.0 so it looks like I am affected by the bug you
> described.
>
>
>
> It also appears that the shutdown hook doesn’t work correctly when the
> driver is running in YARN. According to the logs it looks like the
> SparkContext is closed and the code you suggested is never executed and
> fails silently.
>
>
>
> I really appreciate your help, but it looks like I’m back to the drawing
> board on this one.
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Thursday, March 12, 2015 7:53 PM
> *To:* Jose Fernandez
> *Cc:* user@spark.apache.org
> *Subject:* Re: Handling worker batch processing during driver shutdown
>
>
>
>
>
>
>
> What version of Spark are you using. You may be hitting a known but solved
> bug where the receivers would not get stop signal and (stopGracefully =
> true) would wait for a while for the receivers to stop indefinitely. Try
> setting stopGracefully to false and see if it works.
>
> This bug should have been solved in spark 1.2.1
>
>
>
> https://issues.apache.org/jira/browse/SPARK-5035
>
>
>
> TD
>
>
>
> On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez <jf...@sdl.com>
> wrote:
>
> Thanks for the reply!
>
>
>
> Theoretically I should be able to do as you suggest as I follow the pool
> design pattern from the documentation, but I don’t seem to be able to run
> any code after .stop() is called.
>
>
>
>   override def main(args: Array[String]) {
>
>     // setup
>
>     val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
>
>     val inputStreams = (1 to numReceivers).map(i =>
> ssc.receiverStream(<custom receiver>))
>
>     val messages = ssc.union(inputStreams)
>
>
>
>     messages.foreachRDD { rdd =>
>
>       rdd.foreachPartition { p =>
>
>         val indexer = Indexer.getInstance()
>
>
>
>         p.foreach(Indexer.process(_) match {
>
>           case Some(entry) => indexer.index(entry)
>
>           case None =>
>
>         })
>
>
>
>         Indexer.returnInstance(indexer)
>
>       }
>
>     }
>
>
>
>     messages.print()
>
>
>
>     sys.ShutdownHookThread {
>
>       logInfo("****************** Shutdown hook triggered
> ******************")
>
>       ssc.stop(false, true)
>
>       logInfo("****************** Shutdown finished ******************")
>
>       ssc.stop(true)
>
>     }
>
>
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>   }
>
>
>
> The first shutdown log message is always displayed, but the second message
> never does. I’ve tried multiple permutations of the stop function calls and
> even used try/catch around it. I’m running in yarn-cluster mode using Spark
> 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Thursday, March 12, 2015 1:29 PM
> *To:* Jose Fernandez
> *Cc:* user@spark.apache.org
> *Subject:* Re: Handling worker batch processing during driver shutdown
>
>
>
> Can you access the batcher directly? Like is there is there a handle to
> get access to the batchers on the executors by running a task on that
> executor? If so, after the streamingContext has been stopped (not the
> SparkContext), then you can use `sc.makeRDD()` to run a dummy task like
> this.
>
>
>
> sc.makeRDD(1 to 1000, 1000).foreach { x =>
>
>    Batcher.get().flush()
>
> }
>
>
>
> With large number of tasks and no other jobs running in the system, at
> least one task will run in each executor and therefore will flush the
> batcher.
>
>
>
> TD
>
>
>
> On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jf...@sdl.com>
> wrote:
>
> Hi folks,
>
>
>
> I have a shutdown hook in my driver which stops the streaming context
> cleanly. This is great as workers can finish their current processing unit
> before shutting down. Unfortunately each worker contains a batch processor
> which only flushes every X entries. We’re indexing to different indices in
> elasticsearch and using the bulk index request for performance. As far as
> Spark is concerned, once data is added to the batcher it is considered
> processed, so our workers are being shut down with data still in the
> batcher.
>
>
>
> Is there any way to coordinate the shutdown with the workers? I haven’t
> had any luck searching for a solution online. I would appreciate any
> suggestions you may have.
>
>
>
> Thanks :)
>
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
>
>
> This message has been scanned for malware by Websense. www.websense.com
>
>
>
>
>
> Click here
> <https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==>
> to report this email as spam.
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
>
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>

RE: Handling worker batch processing during driver shutdown

Posted by Jose Fernandez <jf...@sdl.com>.
Thanks, I am using 1.2.0 so it looks like I am affected by the bug you described.

It also appears that the shutdown hook doesn’t work correctly when the driver is running in YARN. According to the logs it looks like the SparkContext is closed and the code you suggested is never executed and fails silently.

I really appreciate your help, but it looks like I’m back to the drawing board on this one.

From: Tathagata Das [mailto:tdas@databricks.com]
Sent: Thursday, March 12, 2015 7:53 PM
To: Jose Fernandez
Cc: user@spark.apache.org
Subject: Re: Handling worker batch processing during driver shutdown



What version of Spark are you using. You may be hitting a known but solved bug where the receivers would not get stop signal and (stopGracefully = true) would wait for a while for the receivers to stop indefinitely. Try setting stopGracefully to false and see if it works.
This bug should have been solved in spark 1.2.1

https://issues.apache.org/jira/browse/SPARK-5035

TD

On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez <jf...@sdl.com>> wrote:
Thanks for the reply!

Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called.

  override def main(args: Array[String]) {
    // setup
    val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
    val inputStreams = (1 to numReceivers).map(i => ssc.receiverStream(<custom receiver>))
    val messages = ssc.union(inputStreams)

    messages.foreachRDD { rdd =>
      rdd.foreachPartition { p =>
        val indexer = Indexer.getInstance()

        p.foreach(Indexer.process(_) match {
          case Some(entry) => indexer.index(entry)
          case None =>
        })

        Indexer.returnInstance(indexer)
      }
    }

    messages.print()

    sys.ShutdownHookThread {
      logInfo("****************** Shutdown hook triggered ******************")
      ssc.stop(false, true)
      logInfo("****************** Shutdown finished ******************")
      ssc.stop(true)
    }

    ssc.start()
    ssc.awaitTermination()
  }

The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>.


From: Tathagata Das [mailto:tdas@databricks.com<ma...@databricks.com>]
Sent: Thursday, March 12, 2015 1:29 PM
To: Jose Fernandez
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Handling worker batch processing during driver shutdown

Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this.

sc.makeRDD(1 to 1000, 1000).foreach { x =>
   Batcher.get().flush()
}

With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher.

TD

On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jf...@sdl.com>> wrote:
Hi folks,

I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher.

Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have.

Thanks :)


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <http://www.sdl.com/innovate/sanfran>


SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.



This message has been scanned for malware by Websense. www.websense.com<http://www.websense.com/>



Click here<https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==> to report this email as spam.

 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <http://www.sdl.com/innovate/sanfran>


SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.




 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <www.sdl.com/innovate/sanfran>

SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.


Re: Handling worker batch processing during driver shutdown

Posted by Tathagata Das <td...@databricks.com>.
What version of Spark are you using. You may be hitting a known but solved
bug where the receivers would not get stop signal and (stopGracefully =
true) would wait for a while for the receivers to stop indefinitely. Try
setting stopGracefully to false and see if it works.
This bug should have been solved in spark 1.2.1

https://issues.apache.org/jira/browse/SPARK-5035

TD

On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez <jf...@sdl.com> wrote:

>  Thanks for the reply!
>
>
>
> Theoretically I should be able to do as you suggest as I follow the pool
> design pattern from the documentation, but I don’t seem to be able to run
> any code after .stop() is called.
>
>
>
>   override def main(args: Array[String]) {
>
>     // setup
>
>     val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
>
>     val inputStreams = (1 to numReceivers).map(i =>
> ssc.receiverStream(<custom receiver>))
>
>     val messages = ssc.union(inputStreams)
>
>
>
>     messages.foreachRDD { rdd =>
>
>       rdd.foreachPartition { p =>
>
>         val indexer = Indexer.getInstance()
>
>
>
>         p.foreach(Indexer.process(_) match {
>
>           case Some(entry) => indexer.index(entry)
>
>           case None =>
>
>         })
>
>
>
>         Indexer.returnInstance(indexer)
>
>       }
>
>     }
>
>
>
>     messages.print()
>
>
>
>     sys.ShutdownHookThread {
>
>       logInfo("****************** Shutdown hook triggered
> ******************")
>
>       ssc.stop(false, true)
>
>       logInfo("****************** Shutdown finished ******************")
>
>       ssc.stop(true)
>
>     }
>
>
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>   }
>
>
>
> The first shutdown log message is always displayed, but the second message
> never does. I’ve tried multiple permutations of the stop function calls and
> even used try/catch around it. I’m running in yarn-cluster mode using Spark
> 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Thursday, March 12, 2015 1:29 PM
> *To:* Jose Fernandez
> *Cc:* user@spark.apache.org
> *Subject:* Re: Handling worker batch processing during driver shutdown
>
>
>
> Can you access the batcher directly? Like is there is there a handle to
> get access to the batchers on the executors by running a task on that
> executor? If so, after the streamingContext has been stopped (not the
> SparkContext), then you can use `sc.makeRDD()` to run a dummy task like
> this.
>
>
>
> sc.makeRDD(1 to 1000, 1000).foreach { x =>
>
>    Batcher.get().flush()
>
> }
>
>
>
> With large number of tasks and no other jobs running in the system, at
> least one task will run in each executor and therefore will flush the
> batcher.
>
>
>
> TD
>
>
>
> On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jf...@sdl.com>
> wrote:
>
> Hi folks,
>
>
>
> I have a shutdown hook in my driver which stops the streaming context
> cleanly. This is great as workers can finish their current processing unit
> before shutting down. Unfortunately each worker contains a batch processor
> which only flushes every X entries. We’re indexing to different indices in
> elasticsearch and using the bulk index request for performance. As far as
> Spark is concerned, once data is added to the batcher it is considered
> processed, so our workers are being shut down with data still in the
> batcher.
>
>
>
> Is there any way to coordinate the shutdown with the workers? I haven’t
> had any luck searching for a solution online. I would appreciate any
> suggestions you may have.
>
>
>
> Thanks :)
>
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
>
>
>  This message has been scanned for malware by Websense. www.websense.com
>
>
>
>
>
> Click here
> <https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==>
> to report this email as spam.
>
>   <http://www.sdl.com/innovate/sanfran>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>

RE: Handling worker batch processing during driver shutdown

Posted by Jose Fernandez <jf...@sdl.com>.
Thanks for the reply!

Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called.

  override def main(args: Array[String]) {
    // setup
    val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
    val inputStreams = (1 to numReceivers).map(i => ssc.receiverStream(<custom receiver>))
    val messages = ssc.union(inputStreams)

    messages.foreachRDD { rdd =>
      rdd.foreachPartition { p =>
        val indexer = Indexer.getInstance()

        p.foreach(Indexer.process(_) match {
          case Some(entry) => indexer.index(entry)
          case None =>
        })

        Indexer.returnInstance(indexer)
      }
    }

    messages.print()

    sys.ShutdownHookThread {
      logInfo("****************** Shutdown hook triggered ******************")
      ssc.stop(false, true)
      logInfo("****************** Shutdown finished ******************")
      ssc.stop(true)
    }

    ssc.start()
    ssc.awaitTermination()
  }

The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>.


From: Tathagata Das [mailto:tdas@databricks.com]
Sent: Thursday, March 12, 2015 1:29 PM
To: Jose Fernandez
Cc: user@spark.apache.org
Subject: Re: Handling worker batch processing during driver shutdown

Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this.

sc.makeRDD(1 to 1000, 1000).foreach { x =>
   Batcher.get().flush()
}

With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher.

TD

On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jf...@sdl.com>> wrote:
Hi folks,

I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher.

Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have.

Thanks :)


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <http://www.sdl.com/innovate/sanfran>


SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.




This message has been scanned for malware by Websense. www.websense.com<http://www.websense.com/>



Click here<https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==> to report this email as spam.

 [http://www.sdl.com/Content/images/Innovate_2015_400.png] <www.sdl.com/innovate/sanfran>

SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.

SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.


Re: Handling worker batch processing during driver shutdown

Posted by Tathagata Das <td...@databricks.com>.
Can you access the batcher directly? Like is there is there a handle to get
access to the batchers on the executors by running a task on that executor?
If so, after the streamingContext has been stopped (not the SparkContext),
then you can use `sc.makeRDD()` to run a dummy task like this.

sc.makeRDD(1 to 1000, 1000).foreach { x =>
   Batcher.get().flush()
}

With large number of tasks and no other jobs running in the system, at
least one task will run in each executor and therefore will flush the
batcher.

TD

On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jf...@sdl.com> wrote:

>  Hi folks,
>
>
>
> I have a shutdown hook in my driver which stops the streaming context
> cleanly. This is great as workers can finish their current processing unit
> before shutting down. Unfortunately each worker contains a batch processor
> which only flushes every X entries. We’re indexing to different indices in
> elasticsearch and using the bulk index request for performance. As far as
> Spark is concerned, once data is added to the batcher it is considered
> processed, so our workers are being shut down with data still in the
> batcher.
>
>
>
> Is there any way to coordinate the shutdown with the workers? I haven’t
> had any luck searching for a solution online. I would appreciate any
> suggestions you may have.
>
>
>
> Thanks :)
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
>
> This message has been scanned for malware by Websense. www.websense.com
>