You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dibyendu Bhattacharya <di...@gmail.com> on 2014/09/03 19:38:55 UTC

Re: Low Level Kafka Consumer for Spark

Hi,

Sorry for little delay . As discussed in this thread, I have modified the
Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
code to have dedicated Receiver for every Topic Partition. You can see the
example howto create Union of these receivers
in consumer.kafka.client.Consumer.java .

Thanks to Chris for suggesting this change.

Regards,
Dibyendu


On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <ro...@aspect.com> wrote:

> Just a comment on the recovery part.
>
> Is it correct to say that currently Spark Streaming recovery design does
> not
> consider re-computations (upon metadata lineage recovery) that depend on
> blocks of data of the received stream?
> https://issues.apache.org/jira/browse/SPARK-1647
>
> Just to illustrate a real use case (mine):
> - We have object states which have a Duration field per state which is
> incremented on every batch interval. Also this object state is reset to 0
> upon incoming state changing events. Let's supposed there is at least one
> event since the last data checkpoint. This will lead to inconsistency upon
> driver recovery: The Duration field will get incremented from the data
> checkpoint version until the recovery moment, but the state change event
> will never be re-processed...so in the end we have the old state with the
> wrong Duration value.
> To make things worst, let's imagine we're dumping the Duration increases
> somewhere...which means we're spreading the problem across our system.
> Re-computation awareness is something I've commented on another thread and
> rather treat it separately.
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>
> Re-computations do occur, but the only RDD's that are recovered are the
> ones
> from the data checkpoint. This is what we've seen. Is not enough by itself
> to ensure recovery of computed data and this partial recovery leads to
> inconsistency in some cases.
>
> Roger - I share the same question with you - I'm just not sure if the
> replicated data really gets persisted on every batch. The execution lineage
> is checkpointed, but if we have big chunks of data being consumed to
> Receiver node on let's say a second bases then having it persisted to HDFS
> every second could be a big challenge for keeping JVM performance - maybe
> that could be reason why it's not really implemented...assuming it isn't.
>
> Dibyendu had a great effort with the offset controlling code but the
> general
> state consistent recovery feels to me like another big issue to address.
>
> I plan on having a dive into the Streaming code and try to at least
> contribute with some ideas. Some more insight from anyone on the dev team
> will be very appreciated.
>
> tnks,
> Rod
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Low Level Kafka Consumer for Spark

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
My main complain about the WAL mechanism in the new reliable kafka receiver
is that you have to enable checkpointing and for some reason, even if
spark.cleaner.ttl is set to a reasonable value, only the metadata is
cleaned periodically. In my tests, using a folder in my filesystem as the
checkpoint folder, the receivedMetaData folder remains almost constant in
size but the receivedData folder is always increasing; the spark.cleaner.ttl
was configured to 300 seconds.

2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com>:

> Hi,
>
> Yes, as Jerry mentioned, the Spark -3129 (
> https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
> which solves the Driver failure problem. The way 3129 is designed , it
> solved the driver failure problem agnostic of the source of the stream (
> like Kafka or Flume etc) But with just 3129 you can not achieve complete
> solution for data loss. You need a reliable receiver which should also
> solves the data loss issue on receiver failure.
>
> The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
> for which this email thread was started has solved that problem with Kafka
> Low Level API.
>
> And Spark-4062 as Jerry mentioned also recently solved the same problem
> using Kafka High Level API.
>
> On the Kafka High Level Consumer API approach , I would like to mention
> that Kafka 0.8 has some issue as mentioned in this wiki (
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
> where consumer re-balance sometime fails and that is one of the key reason
> Kafka is re-writing consumer API in Kafka 0.9.
>
> I know there are few folks already have faced this re-balancing issues
> while using Kafka High Level API , and If you ask my opinion, we at Pearson
> are still using the Low Level Consumer as this seems to be more robust and
> performant and we have been using this for few months without any issue
> ..and also I may be little biased :)
>
> Regards,
> Dibyendu
>
>
>
> On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai <sa...@intel.com>
> wrote:
>
>> Hi Rod,
>>
>> The purpose of introducing  WAL mechanism in Spark Streaming as a general
>> solution is to make all the receivers be benefit from this mechanism.
>>
>> Though as you said, external sources like Kafka have their own checkpoint
>> mechanism, instead of storing data in WAL, we can only store metadata to
>> WAL, and recover from the last committed offsets. But this requires
>> sophisticated design of Kafka receiver with low-level API involved, also we
>> need to take care of rebalance and fault tolerance things by ourselves. So
>> right now instead of implementing a whole new receiver, we choose to
>> implement a simple one, though the performance is not so good, it's much
>> easier to understand and maintain.
>>
>> The design purpose and implementation of reliable Kafka receiver can be
>> found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
>> future, to improve the reliable Kafka receiver like what you mentioned is
>> on our scheduler.
>>
>> Thanks
>> Jerry
>>
>>
>> -----Original Message-----
>> From: RodrigoB [mailto:rodrigo.boavida@aspect.com]
>> Sent: Wednesday, December 3, 2014 5:44 AM
>> To: user@spark.incubator.apache.org
>> Subject: Re: Low Level Kafka Consumer for Spark
>>
>> Dibyendu,
>>
>> Just to make sure I will not be misunderstood - My concerns are referring
>> to the Spark upcoming solution and not yours. I would to gather the
>> perspective of someone which implemented recovery with Kafka a different
>> way.
>>
>> Tnks,
>> Rod
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
>> commands, e-mail: user-help@spark.apache.org
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Low Level Kafka Consumer for Spark

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi,

Yes, as Jerry mentioned, the Spark -3129 (
https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
which solves the Driver failure problem. The way 3129 is designed , it
solved the driver failure problem agnostic of the source of the stream (
like Kafka or Flume etc) But with just 3129 you can not achieve complete
solution for data loss. You need a reliable receiver which should also
solves the data loss issue on receiver failure.

The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
for which this email thread was started has solved that problem with Kafka
Low Level API.

And Spark-4062 as Jerry mentioned also recently solved the same problem
using Kafka High Level API.

On the Kafka High Level Consumer API approach , I would like to mention
that Kafka 0.8 has some issue as mentioned in this wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
where consumer re-balance sometime fails and that is one of the key reason
Kafka is re-writing consumer API in Kafka 0.9.

I know there are few folks already have faced this re-balancing issues
while using Kafka High Level API , and If you ask my opinion, we at Pearson
are still using the Low Level Consumer as this seems to be more robust and
performant and we have been using this for few months without any issue
..and also I may be little biased :)

Regards,
Dibyendu



On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai <sa...@intel.com> wrote:

> Hi Rod,
>
> The purpose of introducing  WAL mechanism in Spark Streaming as a general
> solution is to make all the receivers be benefit from this mechanism.
>
> Though as you said, external sources like Kafka have their own checkpoint
> mechanism, instead of storing data in WAL, we can only store metadata to
> WAL, and recover from the last committed offsets. But this requires
> sophisticated design of Kafka receiver with low-level API involved, also we
> need to take care of rebalance and fault tolerance things by ourselves. So
> right now instead of implementing a whole new receiver, we choose to
> implement a simple one, though the performance is not so good, it's much
> easier to understand and maintain.
>
> The design purpose and implementation of reliable Kafka receiver can be
> found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
> future, to improve the reliable Kafka receiver like what you mentioned is
> on our scheduler.
>
> Thanks
> Jerry
>
>
> -----Original Message-----
> From: RodrigoB [mailto:rodrigo.boavida@aspect.com]
> Sent: Wednesday, December 3, 2014 5:44 AM
> To: user@spark.incubator.apache.org
> Subject: Re: Low Level Kafka Consumer for Spark
>
> Dibyendu,
>
> Just to make sure I will not be misunderstood - My concerns are referring
> to the Spark upcoming solution and not yours. I would to gather the
> perspective of someone which implemented recovery with Kafka a different
> way.
>
> Tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
> commands, e-mail: user-help@spark.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

RE: Low Level Kafka Consumer for Spark

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Rod,

The purpose of introducing  WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. 

Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain.

The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler.

Thanks
Jerry


-----Original Message-----
From: RodrigoB [mailto:rodrigo.boavida@aspect.com] 
Sent: Wednesday, December 3, 2014 5:44 AM
To: user@spark.incubator.apache.org
Subject: Re: Low Level Kafka Consumer for Spark

Dibyendu,

Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way.

Tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Low Level Kafka Consumer for Spark

Posted by RodrigoB <ro...@aspect.com>.
Dibyendu,

Just to make sure I will not be misunderstood - My concerns are referring to
the Spark upcoming solution and not yours. I would to gather the perspective
of someone which implemented recovery with Kafka a different way.

Tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Low Level Kafka Consumer for Spark

Posted by RodrigoB <ro...@aspect.com>.
Hi Dibyendu,What are your thoughts on keeping this solution (or not),
considering that Spark Streaming v1.2 will have built-in recoverability of
the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm
concerned about the complexity of this solution with regards the added
complexity and performance overhead by the writing of big amounts of data
into HDFS on a small batch
interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1#
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png>
I think the whole solution is well designed and thought but I'm afraid if it
does really fit all needs with checkpoint based technologies like Flume or
Kafka, by hiding away the management of the offset from the user code. If
instead of saving received data into HDFS, the ReceiverHandler would be
saving some metadata (such as offset in the case of Kafka) specified by the
custom receiver passed into the StreamingContext, then upon driver restart,
that metadata could be used by the custom receiver to recover the point from
which it should start receiving data once more.Anyone's comments will be
greatly appreciated.Tnks,Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Low Level Kafka Consumer for Spark

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith <se...@gmail.com> wrote:

> Hi Dibyendu,
>
> I am a little confused about the need for rate limiting input from
> kafka. If the stream coming in from kafka has higher message/second
> rate than what a Spark job can process then it should simply build a
> backlog in Spark if the RDDs are cached on disk using persist().
> Right?
>
> Thanks,
>
> Tim
>
>
> On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
> <di...@gmail.com> wrote:
> > Hi Alon,
> >
> > No this will not be guarantee that same set of messages will come in same
> > RDD. This fix just re-play the messages from last processed offset in
> same
> > order. Again this is just a interim fix we needed to solve our use case
> . If
> > you do not need this message re-play feature, just do not perform the
> ack (
> > Acknowledgement) call in the Driver code. Then the processed messages
> will
> > not be written to ZK and hence replay will not happen.
> >
> > Regards,
> > Dibyendu
> >
> > On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er <al...@supersonicads.com>
> > wrote:
> >>
> >> Hi Dibyendu,
> >>
> >> Thanks for your great work!
> >>
> >> I'm new to Spark Streaming, so I just want to make sure I understand
> >> Driver
> >> failure issue correctly.
> >>
> >> In my use case, I want to make sure that messages coming in from Kafka
> are
> >> always broken into the same set of RDDs, meaning that if a set of
> messages
> >> are assigned to one RDD, and the Driver dies before this RDD is
> processed,
> >> then once the Driver recovers, the same set of messages are assigned to
> a
> >> single RDD, instead of arbitrarily repartitioning the messages across
> >> different RDDs.
> >>
> >> Does your Receiver guarantee this behavior, until the problem is fixed
> in
> >> Spark 1.2?
> >>
> >> Regards,
> >> Alon
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Low Level Kafka Consumer for Spark

Posted by Tim Smith <se...@gmail.com>.
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
<di...@gmail.com> wrote:
> Hi Alon,
>
> No this will not be guarantee that same set of messages will come in same
> RDD. This fix just re-play the messages from last processed offset in same
> order. Again this is just a interim fix we needed to solve our use case . If
> you do not need this message re-play feature, just do not perform the ack (
> Acknowledgement) call in the Driver code. Then the processed messages will
> not be written to ZK and hence replay will not happen.
>
> Regards,
> Dibyendu
>
> On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er <al...@supersonicads.com>
> wrote:
>>
>> Hi Dibyendu,
>>
>> Thanks for your great work!
>>
>> I'm new to Spark Streaming, so I just want to make sure I understand
>> Driver
>> failure issue correctly.
>>
>> In my use case, I want to make sure that messages coming in from Kafka are
>> always broken into the same set of RDDs, meaning that if a set of messages
>> are assigned to one RDD, and the Driver dies before this RDD is processed,
>> then once the Driver recovers, the same set of messages are assigned to a
>> single RDD, instead of arbitrarily repartitioning the messages across
>> different RDDs.
>>
>> Does your Receiver guarantee this behavior, until the problem is fixed in
>> Spark 1.2?
>>
>> Regards,
>> Alon
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Low Level Kafka Consumer for Spark

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er <al...@supersonicads.com>
wrote:

> Hi Dibyendu,
>
> Thanks for your great work!
>
> I'm new to Spark Streaming, so I just want to make sure I understand Driver
> failure issue correctly.
>
> In my use case, I want to make sure that messages coming in from Kafka are
> always broken into the same set of RDDs, meaning that if a set of messages
> are assigned to one RDD, and the Driver dies before this RDD is processed,
> then once the Driver recovers, the same set of messages are assigned to a
> single RDD, instead of arbitrarily repartitioning the messages across
> different RDDs.
>
> Does your Receiver guarantee this behavior, until the problem is fixed in
> Spark 1.2?
>
> Regards,
> Alon
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Low Level Kafka Consumer for Spark

Posted by Alon Pe'er <al...@supersonicads.com>.
Hi Dibyendu,

Thanks for your great work!

I'm new to Spark Streaming, so I just want to make sure I understand Driver
failure issue correctly.

In my use case, I want to make sure that messages coming in from Kafka are
always broken into the same set of RDDs, meaning that if a set of messages
are assigned to one RDD, and the Driver dies before this RDD is processed,
then once the Driver recovers, the same set of messages are assigned to a
single RDD, instead of arbitrarily repartitioning the messages across
different RDDs.

Does your Receiver guarantee this behavior, until the problem is fixed in
Spark 1.2?

Regards,
Alon



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Low Level Kafka Consumer for Spark

Posted by Tim Smith <se...@gmail.com>.
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.




On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das <ta...@gmail.com>
wrote:

> Some thoughts on this thread to clarify the doubts.
>
> 1. Driver recovery: The current (1.1 to be released) does not recover the
> raw data that has been received but not processes. This is because when the
> driver dies, the executors die and so does the raw data that was stored in
> it. Only for HDFS, the data is not lost by driver recovery as the data is
> already present reliably in HDFS. This is something we want to fix by Spark
> 1.2 (3 month from now). Regarding recovery by replaying the data from
> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
> exactly-once semantics in all transformations. To guarantee this for all
> kinds of streaming computations stateful and not-stateful computations, it
> is requires that the data be replayed through Kafka in exactly same order,
> and the underlying blocks of data in Spark be regenerated in the exact way
> as it would have if there was no driver failure. This is quite tricky to
> implement, requires manipulation of zookeeper offsets, etc, that is hard to
> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
> Kafka receiver may enable such approaches in the future. For now we
> definitely plan to solve the first problem very very soon.
>
> 3. Repartitioning: I am trying to understand the repartition issue. One
> common mistake I have seen is that developers repartition a stream but not
> use the repartitioned stream.
>
> WRONG:
> inputDstream.repartition(100)
> inputDstream.map(...).count().print()
>
> RIGHT:
> val repartitionedDStream = inputDStream.repartitoin(100)
> repartitionedDStream.map(...).count().print()
>
> Not sure if this helps solve the problem that you all the facing. I am
> going to add this to the stremaing programming guide to make sure this
> common mistake is avoided.
>
> TD
>
>
>
>
> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Hi,
>>
>> Sorry for little delay . As discussed in this thread, I have modified the
>> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
>> code to have dedicated Receiver for every Topic Partition. You can see the
>> example howto create Union of these receivers
>> in consumer.kafka.client.Consumer.java .
>>
>> Thanks to Chris for suggesting this change.
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <ro...@aspect.com>
>> wrote:
>>
>>> Just a comment on the recovery part.
>>>
>>> Is it correct to say that currently Spark Streaming recovery design does
>>> not
>>> consider re-computations (upon metadata lineage recovery) that depend on
>>> blocks of data of the received stream?
>>> https://issues.apache.org/jira/browse/SPARK-1647
>>>
>>> Just to illustrate a real use case (mine):
>>> - We have object states which have a Duration field per state which is
>>> incremented on every batch interval. Also this object state is reset to 0
>>> upon incoming state changing events. Let's supposed there is at least one
>>> event since the last data checkpoint. This will lead to inconsistency
>>> upon
>>> driver recovery: The Duration field will get incremented from the data
>>> checkpoint version until the recovery moment, but the state change event
>>> will never be re-processed...so in the end we have the old state with the
>>> wrong Duration value.
>>> To make things worst, let's imagine we're dumping the Duration increases
>>> somewhere...which means we're spreading the problem across our system.
>>> Re-computation awareness is something I've commented on another thread
>>> and
>>> rather treat it separately.
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>>
>>> Re-computations do occur, but the only RDD's that are recovered are the
>>> ones
>>> from the data checkpoint. This is what we've seen. Is not enough by
>>> itself
>>> to ensure recovery of computed data and this partial recovery leads to
>>> inconsistency in some cases.
>>>
>>> Roger - I share the same question with you - I'm just not sure if the
>>> replicated data really gets persisted on every batch. The execution
>>> lineage
>>> is checkpointed, but if we have big chunks of data being consumed to
>>> Receiver node on let's say a second bases then having it persisted to
>>> HDFS
>>> every second could be a big challenge for keeping JVM performance - maybe
>>> that could be reason why it's not really implemented...assuming it isn't.
>>>
>>> Dibyendu had a great effort with the offset controlling code but the
>>> general
>>> state consistent recovery feels to me like another big issue to address.
>>>
>>> I plan on having a dive into the Streaming code and try to at least
>>> contribute with some ideas. Some more insight from anyone on the dev team
>>> will be very appreciated.
>>>
>>> tnks,
>>> Rod
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Low Level Kafka Consumer for Spark

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to "Store" ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only "storage.BlockManagerInfo: Added input..." type
messages, but hardly I see "scheduler.TaskSetManager: Starting task..."
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Hi Tathagata,
>
> I have managed to implement the logic into the Kafka-Spark consumer to
> recover from Driver failure. This is just a interim fix till actual fix is
> done from Spark side.
>
> The logic is something like this.
>
> 1. When the Individual Receivers starts for every Topic partition, it
> writes the Kafka messages along with certain meta data in Block Store. This
> meta data contains the details of message offset, partition id, topic name
> and consumer id. You can see this logic in PartitionManager.java  next()
> method.
>
> 2.  In the Driver code ( Consumer.java) , I am creating the union of all
> there individual D-Streams, and processing the data using forEachRDD call.
> In the driver code, I am receiving the RDD which contains the Kafka
> messages along with meta data details. In the driver code, periodically I
> am committing the "processed" offset of the Kafka message into ZK.
>
> 3. When driver stops, and restart again, the Receiver starts again, and
> this time in PartitionManager.java, I am checking what is the actual
> "committed" offset for the partition, and what is the actual "processed"
> offset of the same partition. This logic is in the PartitionManager
> constructor.
>
> If this is a Receiver restart, and "processed" offset of less than
> "Committed" offset, I am started fetching again from "Processed" offset.
> This may lead to duplicate records, but our system can handle duplicates.
>
> I have tested with multiple driver kill/stops and I found no data loss in
> Kafka consumer.
>
> In the Driver code, I have not done any "checkpointing" yet, will test
> that tomorrow.
>
>
> One interesting thing I found, if I do "repartition" of original stream ,
> I can still see the issue of data loss in this logic. What I believe,
> during re- partitioning Spark might be changing the order of RDDs the way
> it generated from Kafka stream. So during re-partition case, even when I am
> committing processed offset, but as this is not in order I still see issue.
> Not sure if this understanding is correct, but not able to find any other
> explanation.
>
> But if I do not use repartition this solution works fine.
>
> I can make this as configurable, so that when actual fix is available ,
> this feature in consumer can be turned off as this is an overhead for the
> consumer . Let me know what you think..
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Some thoughts on this thread to clarify the doubts.
>>
>> 1. Driver recovery: The current (1.1 to be released) does not recover the
>> raw data that has been received but not processes. This is because when the
>> driver dies, the executors die and so does the raw data that was stored in
>> it. Only for HDFS, the data is not lost by driver recovery as the data is
>> already present reliably in HDFS. This is something we want to fix by Spark
>> 1.2 (3 month from now). Regarding recovery by replaying the data from
>> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
>> exactly-once semantics in all transformations. To guarantee this for all
>> kinds of streaming computations stateful and not-stateful computations, it
>> is requires that the data be replayed through Kafka in exactly same order,
>> and the underlying blocks of data in Spark be regenerated in the exact way
>> as it would have if there was no driver failure. This is quite tricky to
>> implement, requires manipulation of zookeeper offsets, etc, that is hard to
>> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
>> Kafka receiver may enable such approaches in the future. For now we
>> definitely plan to solve the first problem very very soon.
>>
>> 3. Repartitioning: I am trying to understand the repartition issue. One
>> common mistake I have seen is that developers repartition a stream but not
>> use the repartitioned stream.
>>
>> WRONG:
>> inputDstream.repartition(100)
>> inputDstream.map(...).count().print()
>>
>> RIGHT:
>> val repartitionedDStream = inputDStream.repartitoin(100)
>> repartitionedDStream.map(...).count().print()
>>
>> Not sure if this helps solve the problem that you all the facing. I am
>> going to add this to the stremaing programming guide to make sure this
>> common mistake is avoided.
>>
>> TD
>>
>>
>>
>>
>> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattachary@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Sorry for little delay . As discussed in this thread, I have modified
>>> the Kafka-Spark-Consumer (
>>> https://github.com/dibbhatt/kafka-spark-consumer) code to have
>>> dedicated Receiver for every Topic Partition. You can see the example howto
>>> create Union of these receivers in consumer.kafka.client.Consumer.java .
>>>
>>> Thanks to Chris for suggesting this change.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <ro...@aspect.com>
>>> wrote:
>>>
>>>> Just a comment on the recovery part.
>>>>
>>>> Is it correct to say that currently Spark Streaming recovery design
>>>> does not
>>>> consider re-computations (upon metadata lineage recovery) that depend on
>>>> blocks of data of the received stream?
>>>> https://issues.apache.org/jira/browse/SPARK-1647
>>>>
>>>> Just to illustrate a real use case (mine):
>>>> - We have object states which have a Duration field per state which is
>>>> incremented on every batch interval. Also this object state is reset to
>>>> 0
>>>> upon incoming state changing events. Let's supposed there is at least
>>>> one
>>>> event since the last data checkpoint. This will lead to inconsistency
>>>> upon
>>>> driver recovery: The Duration field will get incremented from the data
>>>> checkpoint version until the recovery moment, but the state change event
>>>> will never be re-processed...so in the end we have the old state with
>>>> the
>>>> wrong Duration value.
>>>> To make things worst, let's imagine we're dumping the Duration increases
>>>> somewhere...which means we're spreading the problem across our system.
>>>> Re-computation awareness is something I've commented on another thread
>>>> and
>>>> rather treat it separately.
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>>>
>>>> Re-computations do occur, but the only RDD's that are recovered are the
>>>> ones
>>>> from the data checkpoint. This is what we've seen. Is not enough by
>>>> itself
>>>> to ensure recovery of computed data and this partial recovery leads to
>>>> inconsistency in some cases.
>>>>
>>>> Roger - I share the same question with you - I'm just not sure if the
>>>> replicated data really gets persisted on every batch. The execution
>>>> lineage
>>>> is checkpointed, but if we have big chunks of data being consumed to
>>>> Receiver node on let's say a second bases then having it persisted to
>>>> HDFS
>>>> every second could be a big challenge for keeping JVM performance -
>>>> maybe
>>>> that could be reason why it's not really implemented...assuming it
>>>> isn't.
>>>>
>>>> Dibyendu had a great effort with the offset controlling code but the
>>>> general
>>>> state consistent recovery feels to me like another big issue to address.
>>>>
>>>> I plan on having a dive into the Streaming code and try to at least
>>>> contribute with some ideas. Some more insight from anyone on the dev
>>>> team
>>>> will be very appreciated.
>>>>
>>>> tnks,
>>>> Rod
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Low Level Kafka Consumer for Spark

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi Tathagata,

I have managed to implement the logic into the Kafka-Spark consumer to
recover from Driver failure. This is just a interim fix till actual fix is
done from Spark side.

The logic is something like this.

1. When the Individual Receivers starts for every Topic partition, it
writes the Kafka messages along with certain meta data in Block Store. This
meta data contains the details of message offset, partition id, topic name
and consumer id. You can see this logic in PartitionManager.java  next()
method.

2.  In the Driver code ( Consumer.java) , I am creating the union of all
there individual D-Streams, and processing the data using forEachRDD call.
In the driver code, I am receiving the RDD which contains the Kafka
messages along with meta data details. In the driver code, periodically I
am committing the "processed" offset of the Kafka message into ZK.

3. When driver stops, and restart again, the Receiver starts again, and
this time in PartitionManager.java, I am checking what is the actual
"committed" offset for the partition, and what is the actual "processed"
offset of the same partition. This logic is in the PartitionManager
constructor.

If this is a Receiver restart, and "processed" offset of less than
"Committed" offset, I am started fetching again from "Processed" offset.
This may lead to duplicate records, but our system can handle duplicates.

I have tested with multiple driver kill/stops and I found no data loss in
Kafka consumer.

In the Driver code, I have not done any "checkpointing" yet, will test that
tomorrow.


One interesting thing I found, if I do "repartition" of original stream , I
can still see the issue of data loss in this logic. What I believe, during
re- partitioning Spark might be changing the order of RDDs the way it
generated from Kafka stream. So during re-partition case, even when I am
committing processed offset, but as this is not in order I still see issue.
Not sure if this understanding is correct, but not able to find any other
explanation.

But if I do not use repartition this solution works fine.

I can make this as configurable, so that when actual fix is available ,
this feature in consumer can be turned off as this is an overhead for the
consumer . Let me know what you think..

Regards,
Dibyendu




On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Some thoughts on this thread to clarify the doubts.
>
> 1. Driver recovery: The current (1.1 to be released) does not recover the
> raw data that has been received but not processes. This is because when the
> driver dies, the executors die and so does the raw data that was stored in
> it. Only for HDFS, the data is not lost by driver recovery as the data is
> already present reliably in HDFS. This is something we want to fix by Spark
> 1.2 (3 month from now). Regarding recovery by replaying the data from
> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
> exactly-once semantics in all transformations. To guarantee this for all
> kinds of streaming computations stateful and not-stateful computations, it
> is requires that the data be replayed through Kafka in exactly same order,
> and the underlying blocks of data in Spark be regenerated in the exact way
> as it would have if there was no driver failure. This is quite tricky to
> implement, requires manipulation of zookeeper offsets, etc, that is hard to
> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
> Kafka receiver may enable such approaches in the future. For now we
> definitely plan to solve the first problem very very soon.
>
> 3. Repartitioning: I am trying to understand the repartition issue. One
> common mistake I have seen is that developers repartition a stream but not
> use the repartitioned stream.
>
> WRONG:
> inputDstream.repartition(100)
> inputDstream.map(...).count().print()
>
> RIGHT:
> val repartitionedDStream = inputDStream.repartitoin(100)
> repartitionedDStream.map(...).count().print()
>
> Not sure if this helps solve the problem that you all the facing. I am
> going to add this to the stremaing programming guide to make sure this
> common mistake is avoided.
>
> TD
>
>
>
>
> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Hi,
>>
>> Sorry for little delay . As discussed in this thread, I have modified the
>> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
>> code to have dedicated Receiver for every Topic Partition. You can see the
>> example howto create Union of these receivers
>> in consumer.kafka.client.Consumer.java .
>>
>> Thanks to Chris for suggesting this change.
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <ro...@aspect.com>
>> wrote:
>>
>>> Just a comment on the recovery part.
>>>
>>> Is it correct to say that currently Spark Streaming recovery design does
>>> not
>>> consider re-computations (upon metadata lineage recovery) that depend on
>>> blocks of data of the received stream?
>>> https://issues.apache.org/jira/browse/SPARK-1647
>>>
>>> Just to illustrate a real use case (mine):
>>> - We have object states which have a Duration field per state which is
>>> incremented on every batch interval. Also this object state is reset to 0
>>> upon incoming state changing events. Let's supposed there is at least one
>>> event since the last data checkpoint. This will lead to inconsistency
>>> upon
>>> driver recovery: The Duration field will get incremented from the data
>>> checkpoint version until the recovery moment, but the state change event
>>> will never be re-processed...so in the end we have the old state with the
>>> wrong Duration value.
>>> To make things worst, let's imagine we're dumping the Duration increases
>>> somewhere...which means we're spreading the problem across our system.
>>> Re-computation awareness is something I've commented on another thread
>>> and
>>> rather treat it separately.
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>>
>>> Re-computations do occur, but the only RDD's that are recovered are the
>>> ones
>>> from the data checkpoint. This is what we've seen. Is not enough by
>>> itself
>>> to ensure recovery of computed data and this partial recovery leads to
>>> inconsistency in some cases.
>>>
>>> Roger - I share the same question with you - I'm just not sure if the
>>> replicated data really gets persisted on every batch. The execution
>>> lineage
>>> is checkpointed, but if we have big chunks of data being consumed to
>>> Receiver node on let's say a second bases then having it persisted to
>>> HDFS
>>> every second could be a big challenge for keeping JVM performance - maybe
>>> that could be reason why it's not really implemented...assuming it isn't.
>>>
>>> Dibyendu had a great effort with the offset controlling code but the
>>> general
>>> state consistent recovery feels to me like another big issue to address.
>>>
>>> I plan on having a dive into the Streaming code and try to at least
>>> contribute with some ideas. Some more insight from anyone on the dev team
>>> will be very appreciated.
>>>
>>> tnks,
>>> Rod
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Low Level Kafka Consumer for Spark

Posted by Tathagata Das <ta...@gmail.com>.
Some thoughts on this thread to clarify the doubts.

1. Driver recovery: The current (1.1 to be released) does not recover the
raw data that has been received but not processes. This is because when the
driver dies, the executors die and so does the raw data that was stored in
it. Only for HDFS, the data is not lost by driver recovery as the data is
already present reliably in HDFS. This is something we want to fix by Spark
1.2 (3 month from now). Regarding recovery by replaying the data from
Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
exactly-once semantics in all transformations. To guarantee this for all
kinds of streaming computations stateful and not-stateful computations, it
is requires that the data be replayed through Kafka in exactly same order,
and the underlying blocks of data in Spark be regenerated in the exact way
as it would have if there was no driver failure. This is quite tricky to
implement, requires manipulation of zookeeper offsets, etc, that is hard to
do with the high level consumer that KafkaUtil uses. Dibyendu's low level
Kafka receiver may enable such approaches in the future. For now we
definitely plan to solve the first problem very very soon.

3. Repartitioning: I am trying to understand the repartition issue. One
common mistake I have seen is that developers repartition a stream but not
use the repartitioned stream.

WRONG:
inputDstream.repartition(100)
inputDstream.map(...).count().print()

RIGHT:
val repartitionedDStream = inputDStream.repartitoin(100)
repartitionedDStream.map(...).count().print()

Not sure if this helps solve the problem that you all the facing. I am
going to add this to the stremaing programming guide to make sure this
common mistake is avoided.

TD




On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Hi,
>
> Sorry for little delay . As discussed in this thread, I have modified the
> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
> code to have dedicated Receiver for every Topic Partition. You can see the
> example howto create Union of these receivers
> in consumer.kafka.client.Consumer.java .
>
> Thanks to Chris for suggesting this change.
>
> Regards,
> Dibyendu
>
>
> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <ro...@aspect.com>
> wrote:
>
>> Just a comment on the recovery part.
>>
>> Is it correct to say that currently Spark Streaming recovery design does
>> not
>> consider re-computations (upon metadata lineage recovery) that depend on
>> blocks of data of the received stream?
>> https://issues.apache.org/jira/browse/SPARK-1647
>>
>> Just to illustrate a real use case (mine):
>> - We have object states which have a Duration field per state which is
>> incremented on every batch interval. Also this object state is reset to 0
>> upon incoming state changing events. Let's supposed there is at least one
>> event since the last data checkpoint. This will lead to inconsistency upon
>> driver recovery: The Duration field will get incremented from the data
>> checkpoint version until the recovery moment, but the state change event
>> will never be re-processed...so in the end we have the old state with the
>> wrong Duration value.
>> To make things worst, let's imagine we're dumping the Duration increases
>> somewhere...which means we're spreading the problem across our system.
>> Re-computation awareness is something I've commented on another thread and
>> rather treat it separately.
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>
>> Re-computations do occur, but the only RDD's that are recovered are the
>> ones
>> from the data checkpoint. This is what we've seen. Is not enough by itself
>> to ensure recovery of computed data and this partial recovery leads to
>> inconsistency in some cases.
>>
>> Roger - I share the same question with you - I'm just not sure if the
>> replicated data really gets persisted on every batch. The execution
>> lineage
>> is checkpointed, but if we have big chunks of data being consumed to
>> Receiver node on let's say a second bases then having it persisted to HDFS
>> every second could be a big challenge for keeping JVM performance - maybe
>> that could be reason why it's not really implemented...assuming it isn't.
>>
>> Dibyendu had a great effort with the offset controlling code but the
>> general
>> state consistent recovery feels to me like another big issue to address.
>>
>> I plan on having a dive into the Streaming code and try to at least
>> contribute with some ideas. Some more insight from anyone on the dev team
>> will be very appreciated.
>>
>> tnks,
>> Rod
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>