You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2016/06/01 19:37:59 UTC
[jira] [Commented] (SPARK-15669) Driver and Executor have different
akka configuration in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310968#comment-15310968 ]
Shixiong Zhu commented on SPARK-15669:
--------------------------------------
Where did you set "akka.zeromq.poll-timeout"? You need to set it via "SparkConf" otherwise the configurations cannot be sent to the executors.
> Driver and Executor have different akka configuration in YARN cluster mode
> --------------------------------------------------------------------------
>
> Key: SPARK-15669
> URL: https://issues.apache.org/jira/browse/SPARK-15669
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Streaming
> Affects Versions: 1.6.1
> Environment: Scala: 2.10
> Spark: 1.6.1
> Yarn Cluster
> Reporter: Denis Buravlev
> Priority: Critical
>
> I'm trying to run Spark Streaming application that uses ZeroMQ on a YARN cluster. The application is failed with following message: {{ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'}}.
> The configuration file exits and avaiable from Driver and Executor.
> After small research I've found one interesting thing: if you'll try execute the code
> {{println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error!")})}}
> you'll get different results for Driver and Executor:
> Driver stdout log: value from configuration(e.g. 100ms)
> Executor stdout log: "error!"
> *Is it correct behaviour for Spark?*
> Full code for reproducing:
> {code}
> object Application {
> def main(args: Array[String]) {
> val config = ConfigurationManager.getConfig(args)
> val sparkConf = new SparkConf().setAppName("TestApp")
> val ssc = new StreamingContext(sparkConf, Seconds(1))
> println("Driver:")
> println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error1"))
> val rddQueue = new Queue[RDD[Int]]()
> val inputStream = ssc.queueStream(rddQueue)
> val mappedStream = inputStream.map{x =>
> println("Executor:")
> println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error2"))
> (x % 10, 1)
> }
> val reducedStream = mappedStream.reduceByKey(_ + _)
> reducedStream.print()
> ssc.start()
> for (i <- 1 to 30) {
> rddQueue.synchronized {
> rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
> }
> Thread.sleep(1000)
> }
> ssc.awaitTermination()
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org