You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Wisc Forum <wi...@gmail.com> on 2013/10/22 06:18:49 UTC

Spark map function question

Hi, we have tried integrating Spark with our existing code and see some issues.

The issue is that when we use the below function (where func is a function to process elem)

rdd.map{ elem => {func.apply(elem)} }

in the log, I see the apply function are applied a few times for the same element elem instead of one.

When I execute this in a sequential way (see below), everything works just fine.

sparkContext.parallelize(rdd.toArray.map(elem => proj.apply(elem)))

(the only reason I used sparkContext.parallelize) in the above line is because the method requires returning RDD[MyDataType]

Why this happens? Does the map function requires some special thing for the rdd?

Thanks,
Xiaobing

Re: Spark map function question

Posted by Mark Hamstra <ma...@clearstorydata.com>.
How can you possibly expect anyone to give you meaningful advice when you
have told us nothing about what 'func' or 'proj' does?  All I can tell from
your post is that you are using unusual syntax; but that on its own isn't
sufficient to cause a problem:

scala> val func: Int => Int = i => i + 1
func: Int => Int = <function1>

scala> val rdd = sc.parallelize(List(1, 2, 3, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> rdd.map { elem => {func.apply(elem)} }.collect
13/10/21 21:32:48 INFO spark.SparkContext: Starting job: collect at
<console>:17
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Got job 0 (collect at
<console>:17) with 1 output partitions (allowLocal=false)
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Final stage: Stage 0
(collect at <console>:17)
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Parents of final stage:
List()
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Missing parents: List()
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[1] at map at <console>:17), which has no missing parents
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 0 (MappedRDD[1] at map at <console>:17)
13/10/21 21:32:48 INFO local.LocalTaskSetManager: Size of task 0 is 1848
bytes
13/10/21 21:32:48 INFO executor.Executor: Running task ID 0
13/10/21 21:32:48 INFO executor.Executor: Serialized size of result for 0
is 434
13/10/21 21:32:48 INFO executor.Executor: Sending result for 0 directly to
driver
13/10/21 21:32:48 INFO local.LocalScheduler: Remove TaskSet 0.0 from pool
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
13/10/21 21:32:48 INFO executor.Executor: Finished task ID 0
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Stage 0 (collect at
<console>:17) finished in 0.127 s
13/10/21 21:32:48 INFO spark.SparkContext: Job finished: collect at
<console>:17, took 0.219938 s
res0: Array[Int] = Array(2, 3, 4, 5)


On Mon, Oct 21, 2013 at 9:18 PM, Wisc Forum <wi...@gmail.com> wrote:

> Hi, we have tried integrating Spark with our existing code and see some
> issues.
>
> The issue is that when we use the below function (where func is a function
> to process elem)
>
> rdd.map{ elem => {func.apply(elem)} }
>
> in the log, I see the apply function are applied a few times for the same
> element elem instead of one.
>
> When I execute this in a sequential way (see below), everything works just
> fine.
>
> sparkContext.parallelize(rdd.toArray.map(elem => proj.apply(elem)))
>
> (the only reason I used sparkContext.parallelize) in the above line is
> because the method requires returning RDD[MyDataType]
>
> Why this happens? Does the map function requires some special thing for
> the rdd?
>
> Thanks,
> Xiaobing
>