You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/01/20 16:52:39 UTC
[jira] [Resolved] (SPARK-3100) Spark RDD partitions are not running
in the workers as per locality information given by each partition.
[ https://issues.apache.org/jira/browse/SPARK-3100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-3100.
------------------------------
Resolution: Not A Problem
I don't think this is a bug, as described. It is not generally true that a job with 4 tasks will execute on 4 different executors.
> Spark RDD partitions are not running in the workers as per locality information given by each partition.
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-3100
> URL: https://issues.apache.org/jira/browse/SPARK-3100
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.0
> Environment: Running in Spark Standalone Cluster
> Reporter: Ravindra Pesala(Old.Don't assign to it)
>
> I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 workers.
> When I run this RDD in Spark standalone cluster with 4 workers(even master machine has one worker), it runs all partitions in one node only even though I have given locality preferences in my SampleRDD program.
> *Sample Code*
> {code}
> class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String])
> extends Partition {
> override def hashCode(): Int = 41 * (41 + rddId) + idx
> override val index: Int = idx
> }
> class SampleRDD[K,V](
> sc : SparkContext,keyClass: KeyVal[K,V])
> extends RDD[(K,V)](sc, Nil)
> with Logging {
> override def getPartitions: Array[Partition] = {
> val hosts = Array("master","slave1","slave2","slave3")
> val result = new Array[Partition](4)
> for (i <- 0 until result.length)
> {
> result(i) = new SamplePartition(id, i, Array(hosts(i)))
> }
> result
> }
>
>
> override def compute(theSplit: Partition, context: TaskContext) = {
> val iter = new Iterator[(K,V)] {
> val split = theSplit.asInstanceOf[SamplePartition]
> logInfo("Executed task for the split" + split.tableSplit)
>
> // Register an on-task-completion callback to close the input stream.
> context.addOnCompleteCallback(() => close())
> var havePair = false
> var finished = false
> override def hasNext: Boolean = {
> if (!finished && !havePair)
> {
> finished = !false
> havePair = !finished
> }
> !finished
> }
> override def next(): (K,V) = {
> if (!hasNext) {
> throw new java.util.NoSuchElementException("End of stream")
> }
> havePair = false
> val key = new Key()
> val value = new Value()
> keyClass.getKey(key, value)
> }
> private def close() {
> try {
> // reader.close()
> } catch {
> case e: Exception => logWarning("Exception in RecordReader.close()", e)
> }
> }
> }
> iter
> }
>
> override def getPreferredLocations(split: Partition): Seq[String] = {
> val theSplit = split.asInstanceOf[SamplePartition]
> val s = theSplit.tableSplit.filter(_ != "localhost")
> logInfo("Host Name : "+s(0))
> s
> }
> }
> trait KeyVal[K,V] extends Serializable {
> def getKey(key : Key,value : Value) : (K,V)
> }
> class KeyValImpl extends KeyVal[Key,Value] {
> override def getKey(key : Key,value : Value) = (key,value)
> }
> case class Key()
> case class Value()
> object SampleRDD {
> def main(args: Array[String]) : Unit= {
> val d = SparkContext.jarOfClass(this.getClass)
> val ar = new Array[String](d.size)
> var i = 0
> d.foreach{
> p=> ar(i)=p;
> i = i+1
> }
> val sc = new SparkContext("spark://master:7077", "SampleSpark", "/opt/spark-1.0.0-rc3/",ar)
> val rdd = new SampleRDD(sc,new KeyValImpl());
> rdd.collect;
> }
> }
> {code}
> Following is the log it shows.
> {code}
> INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now RUNNING
> INFO 18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now RUNNING
> INFO 18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now RUNNING
> INFO 18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now RUNNING
> INFO 18-08 16:38:34,976 - Registered executor: Actor akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0
> INFO 18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: master (PROCESS_LOCAL)
> INFO 18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms
> INFO 18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: master (PROCESS_LOCAL)
> INFO 18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms
> INFO 18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: master (PROCESS_LOCAL)*
> INFO 18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms
> INFO 18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: master (PROCESS_LOCAL)
> INFO 18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms
> INFO 18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 MB RAM
> INFO 18-08 16:38:35,296 - Registered executor: Actor akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2
> INFO 18-08 16:38:35,302 - Registered executor: Actor akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1
> INFO 18-08 16:38:35,317 - Registered executor: Actor akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3
> {code}
> Here all the tasks are assigned to master only, even I though I have mentioned the locality preferences
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org