You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/07/06 13:11:23 UTC

writing to kafka using spark streaming

I have a requirement to write in kafka queue from a spark streaming
application.

I am using spark 1.2 streaming. Since different executors in spark are
allocated at each run so instantiating a new kafka producer at each run
seems a costly operation .Is there a way to reuse objects in processing
executors(not in receivers)?

Re: writing to kafka using spark streaming

Posted by Shushant Arora <sh...@gmail.com>.
On using foreachPartition jobs get created are not displayed on driver
console but are visible on web ui.
On driver it creates some stage statistics of form [Stage 2:>
                                           (0 + 2) / 5] and disappeared .

I am using foreachPartition as :

kafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>, Void>() {
public Void call(JavaPairRDD<byte[], byte[]> v1) throws Exception {
v1.foreachPartition(new VoidFunction<Iterator<Tuple2<byte[],byte[]>>>() {
public void call(Iterator<Tuple2<byte[], byte[]>> t) throws Exception {
SparkKafkaProducer producer = SparkKafkaProducer.getInstance();
while(t.hasNext()){
Tuple2<byte[], byte[]> tuple = t.next();
//create msg after processing tuple._2()
producer.sendMsg(msg);
}
}
});
return null;
}
});

1.Why jobs are not displayed on driver console?

Is  call function in above code snippet being executed on each workers for
each partition? And on webui also no job get 2.displayed when input source
(kafka queue) does not have any new messages? But when I used mapPartitions
jobs get created and displayed on webui as well as driver for each batch
whether input has data or not ?

 Is it expected behaviour foreachPartition - that it ignores empty
partition or it does not even created partitions when input source was
empty.








On Tue, Jul 7, 2015 at 12:44 AM, Tathagata Das <td...@databricks.com> wrote:

> Both have same efficiency. The primary difference is that one is a
> transformation (hence is lazy, and requires another action to actually
> execute), and the other is an action.
> But it may be a slightly better design in general to have
> "transformations" be purely functional (that is, no external side effect)
> and all non-functional stuff be "actions" (e.g., saveAsHadoopFile is an
> action).
>
>
> On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> whats the difference between foreachPartition vs mapPartitions for a
>> Dtstream both works at partition granularity?
>>
>> One is an operation and another is action but if I call an opeartion
>> afterwords mapPartitions  also, which one is more efficient and
>> recommeded?
>>
>> On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Yeah, creating a new producer at the granularity of partitions may not
>>> be that costly.
>>>
>>> On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Use foreachPartition, and allocate whatever the costly resource is once
>>>> per partition.
>>>>
>>>> On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> I have a requirement to write in kafka queue from a spark streaming
>>>>> application.
>>>>>
>>>>> I am using spark 1.2 streaming. Since different executors in spark are
>>>>> allocated at each run so instantiating a new kafka producer at each run
>>>>> seems a costly operation .Is there a way to reuse objects in processing
>>>>> executors(not in receivers)?
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: writing to kafka using spark streaming

Posted by Tathagata Das <td...@databricks.com>.
Both have same efficiency. The primary difference is that one is a
transformation (hence is lazy, and requires another action to actually
execute), and the other is an action.
But it may be a slightly better design in general to have "transformations"
be purely functional (that is, no external side effect) and all
non-functional stuff be "actions" (e.g., saveAsHadoopFile is an action).


On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora <sh...@gmail.com>
wrote:

> whats the difference between foreachPartition vs mapPartitions for a
> Dtstream both works at partition granularity?
>
> One is an operation and another is action but if I call an opeartion
> afterwords mapPartitions  also, which one is more efficient and
> recommeded?
>
> On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Yeah, creating a new producer at the granularity of partitions may not be
>> that costly.
>>
>> On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Use foreachPartition, and allocate whatever the costly resource is once
>>> per partition.
>>>
>>> On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> I have a requirement to write in kafka queue from a spark streaming
>>>> application.
>>>>
>>>> I am using spark 1.2 streaming. Since different executors in spark are
>>>> allocated at each run so instantiating a new kafka producer at each run
>>>> seems a costly operation .Is there a way to reuse objects in processing
>>>> executors(not in receivers)?
>>>>
>>>>
>>>>
>>>
>>
>

Re: writing to kafka using spark streaming

Posted by Shushant Arora <sh...@gmail.com>.
whats the difference between foreachPartition vs mapPartitions for a
Dtstream both works at partition granularity?

One is an operation and another is action but if I call an opeartion
afterwords mapPartitions  also, which one is more efficient and recommeded?

On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das <td...@databricks.com> wrote:

> Yeah, creating a new producer at the granularity of partitions may not be
> that costly.
>
> On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger <co...@koeninger.org> wrote:
>
>> Use foreachPartition, and allocate whatever the costly resource is once
>> per partition.
>>
>> On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora <shushantarora09@gmail.com
>> > wrote:
>>
>>> I have a requirement to write in kafka queue from a spark streaming
>>> application.
>>>
>>> I am using spark 1.2 streaming. Since different executors in spark are
>>> allocated at each run so instantiating a new kafka producer at each run
>>> seems a costly operation .Is there a way to reuse objects in processing
>>> executors(not in receivers)?
>>>
>>>
>>>
>>
>

Re: writing to kafka using spark streaming

Posted by Tathagata Das <td...@databricks.com>.
Yeah, creating a new producer at the granularity of partitions may not be
that costly.

On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Use foreachPartition, and allocate whatever the costly resource is once
> per partition.
>
> On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora <sh...@gmail.com>
> wrote:
>
>> I have a requirement to write in kafka queue from a spark streaming
>> application.
>>
>> I am using spark 1.2 streaming. Since different executors in spark are
>> allocated at each run so instantiating a new kafka producer at each run
>> seems a costly operation .Is there a way to reuse objects in processing
>> executors(not in receivers)?
>>
>>
>>
>

Re: writing to kafka using spark streaming

Posted by Cody Koeninger <co...@koeninger.org>.
Use foreachPartition, and allocate whatever the costly resource is once per
partition.

On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora <sh...@gmail.com>
wrote:

> I have a requirement to write in kafka queue from a spark streaming
> application.
>
> I am using spark 1.2 streaming. Since different executors in spark are
> allocated at each run so instantiating a new kafka producer at each run
> seems a costly operation .Is there a way to reuse objects in processing
> executors(not in receivers)?
>
>
>