You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2016/06/01 19:37:59 UTC

[jira] [Commented] (SPARK-15669) Driver and Executor have different akka configuration in YARN cluster mode

    [ https://issues.apache.org/jira/browse/SPARK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310968#comment-15310968 ] 

Shixiong Zhu commented on SPARK-15669:
--------------------------------------

Where did you set "akka.zeromq.poll-timeout"? You need to set it via "SparkConf" otherwise the configurations cannot be sent to the executors.

> Driver and Executor have different akka configuration in YARN cluster mode
> --------------------------------------------------------------------------
>
>                 Key: SPARK-15669
>                 URL: https://issues.apache.org/jira/browse/SPARK-15669
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Streaming
>    Affects Versions: 1.6.1
>         Environment: Scala: 2.10
> Spark: 1.6.1
> Yarn Cluster
>            Reporter: Denis Buravlev
>            Priority: Critical
>
> I'm trying to run Spark Streaming application that uses ZeroMQ on a YARN cluster. The application is failed with following message: {{ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'}}.
> The configuration file exits and avaiable from Driver and Executor.
> After small research I've found one interesting thing: if you'll try execute the code
> {{println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error!")})}} 
> you'll get different results for Driver and Executor:
> Driver stdout log: value from configuration(e.g. 100ms)
> Executor stdout log: "error!"
> *Is it correct behaviour for Spark?*
> Full code for reproducing:
> {code}
> object Application {
>   def main(args: Array[String]) {
>     val config = ConfigurationManager.getConfig(args)
>     val sparkConf = new SparkConf().setAppName("TestApp")
>     val ssc = new StreamingContext(sparkConf, Seconds(1))
>     println("Driver:")
>     println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error1"))
>     val rddQueue = new Queue[RDD[Int]]()
>     val inputStream = ssc.queueStream(rddQueue)
>     val mappedStream = inputStream.map{x =>
>       println("Executor:")
>       println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error2"))
>       (x % 10, 1)
>     }
>     val reducedStream = mappedStream.reduceByKey(_ + _)
>     reducedStream.print()
>     ssc.start()
>     for (i <- 1 to 30) {
>       rddQueue.synchronized {
>         rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
>       }
>       Thread.sleep(1000)
>     }
>     ssc.awaitTermination()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org