You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ravindra Pesala (JIRA)" <ji...@apache.org> on 2014/08/18 16:16:25 UTC

[jira] [Commented] (SPARK-3100) Spark RDD partitions are not running in the workers as per locality information given by each partition.

    [ https://issues.apache.org/jira/browse/SPARK-3100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100664#comment-14100664 ] 

Ravindra Pesala commented on SPARK-3100:
----------------------------------------

It seems there is an issue in synchronization of task allocation and registering of executors. As per the above log I observed that task allocations are done with single registered executor. After these tasks are started ,remaining executors are started registering. So this is synchronization issue.

I have added sleep in my driver for 5 seconds then it started working properly.

	  val sc = new SparkContext("spark://master:7077", "SampleSpark", "/opt/spark-1.0.0-rc3/",ar)
	   *Thread.sleep(5000)*
	   val rdd = new SampleRDD(sc,new KeyValImpl());
	   rdd.collect;

INFO  18-08 19:44:36,301 - Registered executor: Actor[akka.tcp://sparkExecutor@master:32457/user/Executor#-966982652] with ID 0
INFO  18-08 19:44:36,505 - Registering block manager master:59964 with 294.4 MB RAM
INFO  18-08 19:44:36,578 - Registered executor: Actor[akka.tcp://sparkExecutor@slave1:11653/user/Executor#909834981] with ID 1
INFO  18-08 19:44:36,591 - Registered executor: Actor[akka.tcp://sparkExecutor@slave2:59220/user/Executor#301495226] with ID 3
INFO  18-08 19:44:36,643 - Registered executor: Actor[akka.tcp://sparkExecutor@slave3:11232/user/Executor#-1118376183] with ID 2
INFO  18-08 19:44:36,804 - Registering block manager slave1:14596 with 294.4 MB RAM
INFO  18-08 19:44:36,809 - Registering block manager slave2:10418 with 294.4 MB RAM
INFO  18-08 19:44:36,871 - Registering block manager slave3:45973 with 294.4 MB RAM
INFO  18-08 19:44:39,507 - Starting job: collect at SampleRDD.scala:142
INFO  18-08 19:44:39,520 - Got job 0 (collect at SampleRDD.scala:142) with 4 output partitions (allowLocal=false)
INFO  18-08 19:44:39,521 - Final stage: Stage 0(collect at SampleRDD.scala:142)
INFO  18-08 19:44:39,521 - Parents of final stage: List()
INFO  18-08 19:44:39,526 - Missing parents: List()
INFO  18-08 19:44:39,532 - Submitting Stage 0 (SampleRDD[0] at RDD at SampleRDD.scala:28), which has no missing parents
INFO  18-08 19:44:39,537 - Host Name : master
INFO  18-08 19:44:39,539 - Host Name : slave1
INFO  18-08 19:44:39,539 - Host Name : slave2
INFO  18-08 19:44:39,540 - Host Name : slave3
INFO  18-08 19:44:39,563 - Submitting 4 missing tasks from Stage 0 (SampleRDD[0] at RDD at SampleRDD.scala:28)
INFO  18-08 19:44:39,564 - Adding task set 0.0 with 4 tasks
INFO  18-08 19:44:39,579 - Starting task 0.0:2 as TID 0 on executor 3: *slave2 (NODE_LOCAL)*
INFO  18-08 19:44:39,583 - Serialized task 0.0:2 as 1261 bytes in 2 ms
INFO  18-08 19:44:39,585 - Starting task 0.0:0 as TID 1 on executor 0: *master (NODE_LOCAL)*
INFO  18-08 19:44:39,585 - Serialized task 0.0:0 as 1261 bytes in 0 ms
INFO  18-08 19:44:39,586 - Starting task 0.0:1 as TID 2 on executor 1: *slave1 (NODE_LOCAL)*
INFO  18-08 19:44:39,586 - Serialized task 0.0:1 as 1261 bytes in 0 ms
INFO  18-08 19:44:39,587 - Starting task 0.0:3 as TID 3 on executor 2: *slave3 (NODE_LOCAL)*
INFO  18-08 19:44:39,587 - Serialized task 0.0:3 as 1261 bytes in 0 ms


> Spark RDD partitions are not running in the workers as per locality information given by each partition.
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-3100
>                 URL: https://issues.apache.org/jira/browse/SPARK-3100
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: Running in Spark Standalone Cluster
>            Reporter: Ravindra Pesala
>
> I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 workers.
> When I run this RDD in Spark standalone cluster with 4 workers(even master machine has one worker), it runs all partitions in one node only even though I have given locality preferences in my SampleRDD program. 
> *Sample Code*
> class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String])
>   extends Partition {
>   override def hashCode(): Int = 41 * (41 + rddId) + idx 
>   override val index: Int = idx
> }
> class SampleRDD[K,V](
>     sc : SparkContext,keyClass: KeyVal[K,V])
>   extends RDD[(K,V)](sc, Nil)
>   with Logging {
>   override def getPartitions: Array[Partition] = {
>     val hosts = Array("master","slave1","slave2","slave3")
>     val result = new Array[Partition](4)
>     for (i <- 0 until result.length) 
>     {
>       result(i) = new SamplePartition(id, i, Array(hosts(i)))
>     }
>     result
>   }
>   
>   
>   override def compute(theSplit: Partition, context: TaskContext) = {
>     val iter = new Iterator[(K,V)] {
>       val split = theSplit.asInstanceOf[SamplePartition]
>       logInfo("Executed task for the split" + split.tableSplit)
>     
>       // Register an on-task-completion callback to close the input stream.
>       context.addOnCompleteCallback(() => close())
>       var havePair = false
>       var finished = false
>       override def hasNext: Boolean = {
>         if (!finished && !havePair) 
>         {
>           finished = !false
>           havePair = !finished
>         }
>         !finished
>       }
>       override def next(): (K,V) = {
>         if (!hasNext) {
>           throw new java.util.NoSuchElementException("End of stream")
>         }
>         havePair = false
>         val key = new Key()
>         val value = new Value()
>         keyClass.getKey(key, value)
>       }
>       private def close() {
>         try {
> //          reader.close()
>         } catch {
>           case e: Exception => logWarning("Exception in RecordReader.close()", e)
>         }
>       }
>     }
>     iter
>   }
>   
>   override def getPreferredLocations(split: Partition): Seq[String] = {
>     val theSplit = split.asInstanceOf[SamplePartition]
>     val s = theSplit.tableSplit.filter(_ != "localhost")
>     logInfo("Host Name : "+s(0))
>     s
>   }
> }
> trait KeyVal[K,V] extends Serializable {
> 	def getKey(key : Key,value : Value) : (K,V)     
> }
> class KeyValImpl extends KeyVal[Key,Value] {
>   override def getKey(key : Key,value : Value) = (key,value)
> }
> case class Key()
> case class Value()
> object SampleRDD {
>     def main(args: Array[String]) : Unit=    {
>     	val d = SparkContext.jarOfClass(this.getClass)
> 	    val ar = new Array[String](d.size)
> 	    var i = 0
> 	    d.foreach{
> 	      p=> ar(i)=p;
> 	      i = i+1
> 	      }      	  
> 	   val sc = new SparkContext("spark://master:7077", "SampleSpark", "/opt/spark-1.0.0-rc3/",ar)	  	   
> 	   val rdd = new SampleRDD(sc,new KeyValImpl());
> 	   rdd.collect;
>     }
> }
> Following is the log it shows.
> INFO  18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now RUNNING
> INFO  18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now RUNNING
> INFO  18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now RUNNING
> INFO  18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now RUNNING
> INFO  18-08 16:38:34,976 - Registered executor: Actor akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0
> INFO  18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: *master (PROCESS_LOCAL)*
> INFO  18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms
> INFO  18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: *master (PROCESS_LOCAL)*
> INFO  18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms
> INFO  18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: *master (PROCESS_LOCAL)*
> INFO  18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms
> INFO  18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: *master (PROCESS_LOCAL)*
> INFO  18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms
> INFO  18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 MB RAM
> INFO  18-08 16:38:35,296 - Registered executor: Actor akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2
> INFO  18-08 16:38:35,302 - Registered executor: Actor akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1
> INFO  18-08 16:38:35,317 - Registered executor: Actor akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org