You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kevin Tran <ke...@gmail.com> on 2016/09/07 07:30:37 UTC

call() function being called 3 times

Hi Everyone,
Does anyone know why call() function being called *3 times* for each
message arrive

JavaDStream<String> message = messagesDStream.map(new
>> Function<Tuple2<String, String>, String>() {
>
> @Override
>
> public String call(Tuple2<String, String> tuple2) {
>
> return tuple2._2();
>
> }
>
> });
>
>
>>
>
> message.foreachRDD(rdd -> {
>
> logger.debug("---> New RDD with " + rdd.partitions().size() + " partitions
>> and " + rdd.count() + " records");   *<== 1*
>
> SQLContext sqlContext = new SQLContext(rdd.context());
>
>
>> JavaRDD<JavaBean> rowRDD = rdd.map(new Function<String, JavaBean>() {
>
> public JavaBean call(String record) {
>>                           *<== being called 3 times*
>
>

What I tried:
 * *cache()*
 * cleaning up *checkpoint dir*

Thanks,
Kevin.

Re: call() function being called 3 times

Posted by Kevin Tran <ke...@gmail.com>.
It turns out that call() function runs in different stages

...
2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID
11)
2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 11's epoch is 0
...
2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID
11). 2412 bytes result sent to driver
...
<=== call() called here !!
....
2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID
12)
2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 12's epoch is 0
....
<=== call() called here !!
....
2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID
12). 2518 bytes result sent to driver
....

Does anyone have any ideas?




On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran <ke...@gmail.com> wrote:

> Hi Everyone,
> Does anyone know why call() function being called *3 times* for each
> message arrive
>
> JavaDStream<String> message = messagesDStream.map(new
>>> Function<Tuple2<String, String>, String>() {
>>
>> @Override
>>
>> public String call(Tuple2<String, String> tuple2) {
>>
>> return tuple2._2();
>>
>> }
>>
>> });
>>
>>
>>>
>>
>> message.foreachRDD(rdd -> {
>>
>> logger.debug("---> New RDD with " + rdd.partitions().size() + "
>>> partitions and " + rdd.count() + " records");   *<== 1*
>>
>> SQLContext sqlContext = new SQLContext(rdd.context());
>>
>>
>>> JavaRDD<JavaBean> rowRDD = rdd.map(new Function<String, JavaBean>() {
>>
>> public JavaBean call(String record) {
>>>                           *<== being called 3 times*
>>
>>
>
> What I tried:
>  * *cache()*
>  * cleaning up *checkpoint dir*
>
> Thanks,
> Kevin.
>
>
>