You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tongzhou Zhou (JIRA)" <ji...@apache.org> on 2017/09/02 20:13:00 UTC

[jira] [Updated] (SPARK-21899) sortBy triggesr a new Job

     [ https://issues.apache.org/jira/browse/SPARK-21899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tongzhou Zhou updated SPARK-21899:
----------------------------------
    Target Version/s: 2.2.0, 2.1.0  (was: 2.1.0, 2.2.0)
             Summary: sortBy triggesr a new Job   (was: sortBy trigger a new Job )

> sortBy triggesr a new Job 
> --------------------------
>
>                 Key: SPARK-21899
>                 URL: https://issues.apache.org/jira/browse/SPARK-21899
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.2.0
>            Reporter: Tongzhou Zhou
>
> I'm new to Spark, just tried out a spark sortBy function to create a "proof of concept" project but found something weird that sortBy is not an transformation function as expect, it trigger a Job and {color:red}later on collect function will need to recompute everything from the begining:({color}
> The result is as expected but we read the input twice!!
> I suspect that sortBy involves an internal action function, any clarification for this?
> Here's my simple Java main function code: 
>     public static void main(String[] args) {        
>         SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]");
>         JavaSparkContext sc = new JavaSparkContext(conf);
>         String logFile = "input1048576.txt";
>         JavaRDD<String> rdd1 = sc.textFile(logFile,2);
>         JavaRDD<String> rdd2 = rdd1.flatMap(record->Arrays.asList(record.split(" ")).iterator());
>         JavaRDD<String> rdd3 = rdd2.sortBy(record->Integer.parseInt(record), true, 2);  //weird, sortBy involves action
>         rdd3.collect(); // take action here
>         sc.stop();
>     }
> {color:red}17/09/02 12:58:06 INFO SparkContext: Starting job: sortBy at sparkTest.java:21
> 17/09/02 12:58:06 INFO DAGScheduler: Got job 0 (sortBy at sparkTest.java:21) with 2 output partitions{color}
> 17/09/02 12:58:06 INFO DAGScheduler: Final stage: ResultStage 0 (sortBy at sparkTest.java:21)
> 17/09/02 12:58:06 INFO DAGScheduler: Parents of final stage: List()
> 17/09/02 12:58:06 INFO DAGScheduler: Missing parents: List()
> 17/09/02 12:58:06 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21), which has no missing parents
> 17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.3 KB, free 912.1 MB)
> 17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.9 KB, free 912.1 MB)
> 17/09/02 12:58:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.17:60007 (size: 2.9 KB, free: 912.3 MB)
> 17/09/02 12:58:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
> 17/09/02 12:58:06 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21) (first 15 tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:06 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 17/09/02 12:58:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4883 bytes)
> 17/09/02 12:58:06 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4883 bytes)
> 17/09/02 12:58:06 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/09/02 12:58:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> {color:red}17/09/02 12:58:06 INFO HadoopRDD: Input split: file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823
> 17/09/02 12:58:06 INFO HadoopRDD: Input split: file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824{color}
> 17/09/02 12:58:06 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036 bytes result sent to driver
> 17/09/02 12:58:06 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 210 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1722 bytes result sent to driver
> 17/09/02 12:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1087 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
> 17/09/02 12:58:07 INFO DAGScheduler: ResultStage 0 (sortBy at sparkTest.java:21) finished in 1.163 s
> 17/09/02 12:58:07 INFO DAGScheduler: Job 0 finished: sortBy at sparkTest.java:21, took 1.306695 s
> {color:red}17/09/02 12:58:07 INFO SparkContext: Starting job: collect at sparkTest.java:23{color}
> 17/09/02 12:58:07 INFO DAGScheduler: Registering RDD 3 (sortBy at sparkTest.java:21)
> 17/09/02 12:58:07 INFO DAGScheduler: Got job 1 (collect at sparkTest.java:23) with 2 output partitions
> 17/09/02 12:58:07 INFO DAGScheduler: Final stage: ResultStage 2 (collect at sparkTest.java:23)
> 17/09/02 12:58:07 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
> 17/09/02 12:58:07 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
> 17/09/02 12:58:07 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21), which has no missing parents
> 17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.1 KB, free 912.1 MB)
> 17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.4 KB, free 912.1 MB)
> 17/09/02 12:58:07 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.17:60007 (size: 3.4 KB, free: 912.3 MB)
> 17/09/02 12:58:07 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
> 17/09/02 12:58:07 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21) (first 15 tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:07 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
> 17/09/02 12:58:07 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 4872 bytes)
> 17/09/02 12:58:07 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, PROCESS_LOCAL, 4872 bytes)
> 17/09/02 12:58:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
> 17/09/02 12:58:07 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
> {color:red}17/09/02 12:58:07 INFO HadoopRDD: Input split: file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824
> 17/09/02 12:58:07 INFO HadoopRDD: Input split: file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823{color}
> 17/09/02 12:58:07 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 853 bytes result sent to driver
> 17/09/02 12:58:07 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 43 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:08 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1025 bytes result sent to driver
> 17/09/02 12:58:08 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 1570 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
> 17/09/02 12:58:08 INFO DAGScheduler: ShuffleMapStage 1 (sortBy at sparkTest.java:21) finished in 1.571 s
> 17/09/02 12:58:08 INFO DAGScheduler: looking for newly runnable stages
> 17/09/02 12:58:08 INFO DAGScheduler: running: Set()
> 17/09/02 12:58:08 INFO DAGScheduler: waiting: Set(ResultStage 2)
> 17/09/02 12:58:08 INFO DAGScheduler: failed: Set()
> 17/09/02 12:58:08 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21), which has no missing parents
> 17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 912.0 MB)
> 17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 912.0 MB)
> 17/09/02 12:58:08 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.17:60007 (size: 2.2 KB, free: 912.3 MB)
> 17/09/02 12:58:08 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
> 17/09/02 12:58:08 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21) (first 15 tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:08 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
> 17/09/02 12:58:08 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, localhost, executor driver, partition 0, ANY, 4621 bytes)
> 17/09/02 12:58:08 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, localhost, executor driver, partition 1, ANY, 4621 bytes)
> 17/09/02 12:58:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
> 17/09/02 12:58:08 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
> 17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.17:60007 in memory (size: 3.4 KB, free: 912.3 MB)
> 17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.17:60007 in memory (size: 2.9 KB, free: 912.3 MB)
> 17/09/02 12:58:11 INFO MemoryStore: Block taskresult_5 stored as bytes in memory (estimated size 5.7 MB, free 906.3 MB)
> 17/09/02 12:58:11 INFO BlockManagerInfo: Added taskresult_5 in memory on 192.168.1.17:60007 (size: 5.7 MB, free: 906.5 MB)
> 17/09/02 12:58:11 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5). 6011268 bytes result sent via BlockManager)
> 17/09/02 12:58:11 INFO TransportClientFactory: Successfully created connection to /192.168.1.17:60007 after 31 ms (0 ms spent in bootstraps)
> 17/09/02 12:58:12 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 3512 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_5 on 192.168.1.17:60007 in memory (size: 5.7 MB, free: 912.3 MB)
> 17/09/02 12:58:12 INFO MemoryStore: Block taskresult_4 stored as bytes in memory (estimated size 6.8 MB, free 905.3 MB)
> 17/09/02 12:58:12 INFO BlockManagerInfo: Added taskresult_4 in memory on 192.168.1.17:60007 (size: 6.8 MB, free: 905.5 MB)
> 17/09/02 12:58:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4). 7143804 bytes result sent via BlockManager)
> 17/09/02 12:58:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 3661 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
> 17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_4 on 192.168.1.17:60007 in memory (size: 6.8 MB, free: 912.3 MB)
> 17/09/02 12:58:12 INFO DAGScheduler: ResultStage 2 (collect at sparkTest.java:23) finished in 3.662 s
> 17/09/02 12:58:12 INFO DAGScheduler: Job 1 finished: collect at sparkTest.java:23, took 5.278293 s



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org