You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "OUASSAIDI, Sami" <sa...@mind7.fr> on 2017/03/16 12:16:21 UTC

[Spark Streaming+Kafka][How-to]

Hi all,

So I need to specify how an executor should consume data from a kafka topic.

Let's say I have 2 topics : t0 and t1 with two partitions each, and two
executors e0 and e1 (both can be on the same node so assign strategy does
not work since in the case of a multi executor node it works based on round
robin scheduling, whatever first available executor consumes the topic
partition )

What I would like to do is make e0 consume partition 0 from both t0 and t1
while e1 consumes partition 1 from the t0 and t1. Is there no way around it
except messing with scheduling ? If so what's the best approach.

The reason for doing so is that executors will write to a cassandra
database and since we will be in a parallelized context one executor might
"collide" with another and therefore data will be lost, by assigning a
partition I want to force the executor to process the data sequentially.

Thanks
Sami
-- 
*Mind7 Consulting*

Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
__

64 Rue Taitbout, 75009 Paris
ᐧ

Re: [Spark Streaming+Kafka][How-to]

Posted by Cody Koeninger <co...@koeninger.org>.
Glad you got it worked out.  That's cool as long as your use case doesn't
actually require e.g. partition 0 to always be scheduled to the same
executor across different batches.

On Tue, Mar 21, 2017 at 7:35 PM, OUASSAIDI, Sami <sa...@mind7.fr>
wrote:

> So it worked quite well with a coalesce, I was able to find an solution to
> my problem : Altough not directly handling the executor a good roundaway
> was to assign the desired partition to a specific stream through assign
> strategy and coalesce to a single partition then repeat the same process
> for the remaining topics on different streams and at the end do a an union
> of these streams.
>
> PS : No shuffle was made during the whole thing since the rdd partitions
> were collapsed to a single one
>
> Le 17 mars 2017 8:04 PM, "Michael Armbrust" <mi...@databricks.com> a
> écrit :
>
>> Another option that would avoid a shuffle would be to use assign and
>> coalesce, running two separate streams.
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"0": xxxx}, t1:{"0": xxxxx}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"1": xxxx}, t1:{"1": xxxxx}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami <sami.ouassaidi@mind7.fr
>> > wrote:
>>
>>> @Cody : Duly noted.
>>> @Michael Ambrust : A repartition is out of the question for our project
>>> as it would be a fairly expensive operation. We tried looking into
>>> targeting a specific executor so as to avoid this extra cost and directly
>>> have well partitioned data after consuming the kafka topics. Also we are
>>> using Spark streaming to save to the cassandra DB and try to keep shuffle
>>> operations to a strict minimum (at best none). As of now we are not
>>> entirely pleased with our current performances, that's why I'm doing a
>>> kafka topic sharding POC and getting the executor to handle the specificied
>>> partitions is central.
>>> ᐧ
>>>
>>> 2017-03-17 9:14 GMT+01:00 Michael Armbrust <mi...@databricks.com>:
>>>
>>>> Sorry, typo.  Should be a repartition not a groupBy.
>>>>
>>>>
>>>>> spark.readStream
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", "...")
>>>>>   .option("subscribe", "t0,t1")
>>>>>   .load()
>>>>>   .repartition($"partition")
>>>>>   .writeStream
>>>>>   .foreach(... code to write to cassandra ...)
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Mind7 Consulting*
>>>
>>> Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
>>> __
>>>
>>> 64 Rue Taitbout, 75009 Paris
>>>
>>
>>

Re: [Spark Streaming+Kafka][How-to]

Posted by "OUASSAIDI, Sami" <sa...@mind7.fr>.
So it worked quite well with a coalesce, I was able to find an solution to
my problem : Altough not directly handling the executor a good roundaway
was to assign the desired partition to a specific stream through assign
strategy and coalesce to a single partition then repeat the same process
for the remaining topics on different streams and at the end do a an union
of these streams.

PS : No shuffle was made during the whole thing since the rdd partitions
were collapsed to a single one

Le 17 mars 2017 8:04 PM, "Michael Armbrust" <mi...@databricks.com> a
écrit :

> Another option that would avoid a shuffle would be to use assign and
> coalesce, running two separate streams.
>
> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("assign", """{t0: {"0": xxxx}, t1:{"0": xxxxx}}""")
>   .load()
>   .coalesce(1)
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>
> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("assign", """{t0: {"1": xxxx}, t1:{"1": xxxxx}}""")
>   .load()
>   .coalesce(1)
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>
> On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami <sa...@mind7.fr>
> wrote:
>
>> @Cody : Duly noted.
>> @Michael Ambrust : A repartition is out of the question for our project
>> as it would be a fairly expensive operation. We tried looking into
>> targeting a specific executor so as to avoid this extra cost and directly
>> have well partitioned data after consuming the kafka topics. Also we are
>> using Spark streaming to save to the cassandra DB and try to keep shuffle
>> operations to a strict minimum (at best none). As of now we are not
>> entirely pleased with our current performances, that's why I'm doing a
>> kafka topic sharding POC and getting the executor to handle the specificied
>> partitions is central.
>> ᐧ
>>
>> 2017-03-17 9:14 GMT+01:00 Michael Armbrust <mi...@databricks.com>:
>>
>>> Sorry, typo.  Should be a repartition not a groupBy.
>>>
>>>
>>>> spark.readStream
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", "...")
>>>>   .option("subscribe", "t0,t1")
>>>>   .load()
>>>>   .repartition($"partition")
>>>>   .writeStream
>>>>   .foreach(... code to write to cassandra ...)
>>>>
>>>
>>
>>
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>>
>
>

Re: [Spark Streaming+Kafka][How-to]

Posted by Michael Armbrust <mi...@databricks.com>.
Another option that would avoid a shuffle would be to use assign and
coalesce, running two separate streams.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"0": xxxx}, t1:{"0": xxxxx}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"1": xxxx}, t1:{"1": xxxxx}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami <sa...@mind7.fr>
wrote:

> @Cody : Duly noted.
> @Michael Ambrust : A repartition is out of the question for our project as
> it would be a fairly expensive operation. We tried looking into targeting a
> specific executor so as to avoid this extra cost and directly have well
> partitioned data after consuming the kafka topics. Also we are using Spark
> streaming to save to the cassandra DB and try to keep shuffle operations to
> a strict minimum (at best none). As of now we are not entirely pleased with
> our current performances, that's why I'm doing a kafka topic sharding POC
> and getting the executor to handle the specificied partitions is central.
> ᐧ
>
> 2017-03-17 9:14 GMT+01:00 Michael Armbrust <mi...@databricks.com>:
>
>> Sorry, typo.  Should be a repartition not a groupBy.
>>
>>
>>> spark.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "...")
>>>   .option("subscribe", "t0,t1")
>>>   .load()
>>>   .repartition($"partition")
>>>   .writeStream
>>>   .foreach(... code to write to cassandra ...)
>>>
>>
>
>
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
>

Re: [Spark Streaming+Kafka][How-to]

Posted by "OUASSAIDI, Sami" <sa...@mind7.fr>.
@Cody : Duly noted.
@Michael Ambrust : A repartition is out of the question for our project as
it would be a fairly expensive operation. We tried looking into targeting a
specific executor so as to avoid this extra cost and directly have well
partitioned data after consuming the kafka topics. Also we are using Spark
streaming to save to the cassandra DB and try to keep shuffle operations to
a strict minimum (at best none). As of now we are not entirely pleased with
our current performances, that's why I'm doing a kafka topic sharding POC
and getting the executor to handle the specificied partitions is central.
ᐧ

2017-03-17 9:14 GMT+01:00 Michael Armbrust <mi...@databricks.com>:

> Sorry, typo.  Should be a repartition not a groupBy.
>
>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("subscribe", "t0,t1")
>>   .load()
>>   .repartition($"partition")
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>


-- 
*Mind7 Consulting*

Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
__

64 Rue Taitbout, 75009 Paris

Re: [Spark Streaming+Kafka][How-to]

Posted by Michael Armbrust <mi...@databricks.com>.
Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>

Re: [Spark Streaming+Kafka][How-to]

Posted by Michael Armbrust <mi...@databricks.com>.
I think it should be straightforward to express this using structured
streaming.  You could ensure that data from a given partition ID is
processed serially by performing a group by on the partition column.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "t0,t1")
  .load()
  .groupBy($"partition")
  .writeStream
  .foreach(... code to write to cassandra ...)


On Thu, Mar 16, 2017 at 8:10 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Spark just really isn't a good fit for trying to pin particular
> computation to a particular executor, especially if you're relying on that
> for correctness.
>
> On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami <sa...@mind7.fr>
> wrote:
>
>>
>> Hi all,
>>
>> So I need to specify how an executor should consume data from a kafka
>> topic.
>>
>> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
>> executors e0 and e1 (both can be on the same node so assign strategy does
>> not work since in the case of a multi executor node it works based on round
>> robin scheduling, whatever first available executor consumes the topic
>> partition )
>>
>> What I would like to do is make e0 consume partition 0 from both t0 and
>> t1 while e1 consumes partition 1 from the t0 and t1. Is there no way around
>> it except messing with scheduling ? If so what's the best approach.
>>
>> The reason for doing so is that executors will write to a cassandra
>> database and since we will be in a parallelized context one executor might
>> "collide" with another and therefore data will be lost, by assigning a
>> partition I want to force the executor to process the data sequentially.
>>
>> Thanks
>> Sami
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>> ᐧ
>>
>
>

Re: [Spark Streaming+Kafka][How-to]

Posted by Cody Koeninger <co...@koeninger.org>.
Spark just really isn't a good fit for trying to pin particular computation
to a particular executor, especially if you're relying on that for
correctness.

On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami <sa...@mind7.fr>
wrote:

>
> Hi all,
>
> So I need to specify how an executor should consume data from a kafka
> topic.
>
> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
> executors e0 and e1 (both can be on the same node so assign strategy does
> not work since in the case of a multi executor node it works based on round
> robin scheduling, whatever first available executor consumes the topic
> partition )
>
> What I would like to do is make e0 consume partition 0 from both t0 and t1
> while e1 consumes partition 1 from the t0 and t1. Is there no way around it
> except messing with scheduling ? If so what's the best approach.
>
> The reason for doing so is that executors will write to a cassandra
> database and since we will be in a parallelized context one executor might
> "collide" with another and therefore data will be lost, by assigning a
> partition I want to force the executor to process the data sequentially.
>
> Thanks
> Sami
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassaidi@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
> ᐧ
>