You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Cody Koeninger <co...@koeninger.org> on 2014/12/18 16:07:05 UTC

Which committers care about Kafka?

Now that 1.2 is finalized...  who are the go-to people to get some
long-standing Kafka related issues resolved?

The existing api is not sufficiently safe nor flexible for our production
use.  I don't think we're alone in this viewpoint, because I've seen
several different patches and libraries to fix the same things we've been
running into.

Regarding flexibility

https://issues.apache.org/jira/browse/SPARK-3146

has been outstanding since August, and IMHO an equivalent of this is
absolutely necessary.  We wrote a similar patch ourselves, then found that
PR and have been running it in production.  We wouldn't be able to get our
jobs done without it.  It also allows users to solve a whole class of
problems for themselves (e.g. SPARK-2388, arbitrary delay of messages, etc).

Regarding safety, I understand the motivation behind WriteAheadLog as a
general solution for streaming unreliable sources, but Kafka already is a
reliable source.  I think there's a need for an api that treats it as
such.  Even aside from the performance issues of duplicating the
write-ahead log in kafka into another write-ahead log in hdfs, I need
exactly-once semantics in the face of failure (I've had failures that
prevented reloading a spark streaming checkpoint, for instance).

I've got an implementation i've been using

https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
/src/main/scala/org/apache/spark/rdd/kafka

Tresata has something similar at https://github.com/tresata/spark-kafka,
and I know there were earlier attempts based on Storm code.

Trying to distribute these kinds of fixes as libraries rather than patches
to Spark is problematic, because large portions of the implementation are
private[spark].

 I'd like to help, but i need to know whose attention to get.

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
Assuming you're talking about spark.streaming.receiver.maxRate, I just
updated my PR to configure rate limiting based on that setting.  So
hopefully that's issue 1 sorted.

Regarding issue 3, as far as I can tell regarding the odd semantics of
stateful or windowed operations in the face of failure, my solution is no
worse than existing classes such as FileStream that use inputdstream
directly rather than a receiver.  Can we get some specific cases that are a
concern?

Regarding the WAL solutions TD mentioned, one of the disadvantages of them
is that they rely on checkpointing, unlike my approach.  As I noted in this
thread and in the jira ticket, I need something that can recover even when
a checkpoint is lost, and I've already seen multiple situations in
production where a checkpoint cannot be recovered (e.g. because code needs
to be updated).

On Mon, Dec 29, 2014 at 7:50 PM, Shao, Saisai <sa...@intel.com> wrote:

>  Hi Cody,
>
>
>
> From my understanding rate control is an optional configuration in Spark
> Streaming and is disabled by default, so user can reach maximum throughput
> without any configuration.
>
>
>
> The reason why rate control is so important in streaming processing is
> that Spark Streaming and other streaming frameworks are easily prone to
> unexpected behavior and failure situation due to network boost and other
> un-controllable inject rate.
>
>
>
> Especially for Spark Streaming,  the large amount of processed data will
> delay the processing time, which will further delay the ongoing job, and
> finally lead to failure.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Cody Koeninger [mailto:cody@koeninger.org]
> *Sent:* Tuesday, December 30, 2014 6:50 AM
> *To:* Tathagata Das
> *Cc:* Hari Shreedharan; Shao, Saisai; Sean McNamara; Patrick Wendell;
> Luis Ángel Vicente Sánchez; Dibyendu Bhattacharya; dev@spark.apache.org;
> Koert Kuipers
>
> *Subject:* Re: Which committers care about Kafka?
>
>
>
> Can you give a little more clarification on exactly what is meant by
>
>
>
> 1. Data rate control
>
>
>
> If someone wants to clamp the maximum number of messages per RDD partition
> in my solution, it would be very straightforward to do so.
>
>
>
> Regarding the holy grail, I'm pretty certain you can't have end-to-end
> transactional semantics without the client code being in charge of offset
> state.  That means the client code is going to also need to be in charge of
> setting up an initial state for updateStateByKey that makes sense; as long
> as they can do that, the job should be safe to restart from arbitrary
> failures.
>
>
>
> On Mon, Dec 29, 2014 at 4:33 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> Hey all,
>
> Some wrap up thoughts on this thread.
>
> Let me first reiterate what Patrick said, that Kafka is super super
> important as it forms the largest fraction of Spark Streaming user
> base. So we really want to improve the Kafka + Spark Streaming
> integration. To this end, some of the things that needs to be
> considered can be broadly classified into the following to sort
> facilitate the discussion.
>
> 1. Data rate control
> 2. Receiver failure semantics - partially achieving this gives
> at-least once, completely achieving this gives exactly-once
> 3. Driver failure semantics - partially achieving this gives at-least
> once, completely achieving this gives exactly-once
>
> Here is a run down of what is achieved by different implementations
> (based on what I think).
>
> 1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
> handle 1 partially (some duplicate data), and could NOT handle 2 (all
> previously received data lost).
>
> 2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
> can handle 3, can almost completely handle 1 and 2 (except few corner
> cases which prevents it from completely guaranteeing exactly-once).
>
> 3. I believe Dibyendu's solution (correct me if i am wrong) can handle
> 1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
> completely solved by extending the solution further.
>
> 4. Cody's solution (again, correct me if I am wrong) does not use
> receivers at all (so eliminates 2). It can handle 3 completely for
> simple operations like map and filter, but not sure if it works
> completely for stateful ops like windows and updateStateByKey. Also it
> does not handle 1.
>
> The real challenge for Kafka is in achieving 3 completely for stateful
> operations while also handling 1.  (i.e., use receivers, but still get
> driver failure guarantees). Solving this will give us our holy grail
> solution, and this is what I want to achieve.
>
> On that note, Cody submitted a PR on his style of achieving
> exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
> am reviewing it. Please follow the PR if you are interested.
>
> TD
>
>
> On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> > The conversation was mostly getting TD up to speed on this thread since
> he
> > had just gotten back from his trip and hadn't seen it.
> >
> > The jira has a summary of the requirements we discussed, I'm sure TD or
> > Patrick can add to the ticket if I missed something.
> > On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hs...@cloudera.com>
> > wrote:
> >
> >> In general such discussions happen or is posted on the dev lists. Could
> >> you please post a summary? Thanks.
> >>
> >> Thanks,
> >> Hari
> >>
> >>
> >> On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>
> >> wrote:
> >>
> >>>  After a long talk with Patrick and TD (thanks guys), I opened the
> >>> following jira
> >>>
> >>> https://issues.apache.org/jira/browse/SPARK-4964
> >>>
> >>> Sample PR has an impementation for the batch and the dstream case, and
> a
> >>> link to a project with example usage.
> >>>
> >>> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com>
> wrote:
> >>>
> >>>> yup, we at tresata do the idempotent store the same way. very simple
> >>>> approach.
> >>>>
> >>>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
> >>>> wrote:
> >>>>>
> >>>>> That KafkaRDD code is dead simple.
> >>>>>
> >>>>> Given a user specified map
> >>>>>
> >>>>> (topic1, partition0) -> (startingOffset, endingOffset)
> >>>>> (topic1, partition1) -> (startingOffset, endingOffset)
> >>>>> ...
> >>>>> turn each one of those entries into a partition of an rdd, using the
> >>>>> simple
> >>>>> consumer.
> >>>>> That's it.  No recovery logic, no state, nothing - for any failures,
> >>>>> bail
> >>>>> on the rdd and let it retry.
> >>>>> Spark stays out of the business of being a distributed database.
> >>>>>
> >>>>> The client code does any transformation it wants, then stores the
> data
> >>>>> and
> >>>>> offsets.  There are two ways of doing this, either based on
> idempotence
> >>>>> or
> >>>>> a transactional data store.
> >>>>>
> >>>>> For idempotent stores:
> >>>>>
> >>>>> 1.manipulate data
> >>>>> 2.save data to store
> >>>>> 3.save ending offsets to the same store
> >>>>>
> >>>>> If you fail between 2 and 3, the offsets haven't been stored, you
> start
> >>>>> again at the same beginning offsets, do the same calculations in the
> >>>>> same
> >>>>> order, overwrite the same data, all is good.
> >>>>>
> >>>>>
> >>>>> For transactional stores:
> >>>>>
> >>>>> 1. manipulate data
> >>>>> 2. begin transaction
> >>>>> 3. save data to the store
> >>>>> 4. save offsets
> >>>>> 5. commit transaction
> >>>>>
> >>>>> If you fail before 5, the transaction rolls back.  To make this less
> >>>>> heavyweight, you can write the data outside the transaction and then
> >>>>> update
> >>>>> a pointer to the current data inside the transaction.
> >>>>>
> >>>>>
> >>>>> Again, spark has nothing much to do with guaranteeing exactly once.
> In
> >>>>> fact, the current streaming api actively impedes my ability to do the
> >>>>> above.  I'm just suggesting providing an api that doesn't get in the
> >>>>> way of
> >>>>> exactly-once.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
> >>>>> hshreedharan@cloudera.com
> >>>>> > wrote:
> >>>>>
> >>>>> > Can you explain your basic algorithm for the once-only-delivery?
> It is
> >>>>> > quite a bit of very Kafka-specific code, that would take more time
> to
> >>>>> read
> >>>>> > than I can currently afford? If you can explain your algorithm a
> bit,
> >>>>> it
> >>>>> > might help.
> >>>>> >
> >>>>> > Thanks,
> >>>>> > Hari
> >>>>> >
> >>>>> >
> >>>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <
> cody@koeninger.org>
> >>>>> > wrote:
> >>>>> >
> >>>>> >>
> >>>>> >> The problems you guys are discussing come from trying to store
> state
> >>>>> in
> >>>>> >> spark, so don't do that.  Spark isn't a distributed database.
> >>>>> >>
> >>>>> >> Just map kafka partitions directly to rdds, llet user code specify
> >>>>> the
> >>>>> >> range of offsets explicitly, and let them be in charge of
> committing
> >>>>> >> offsets.
> >>>>> >>
> >>>>> >> Using the simple consumer isn't that bad, I'm already using this
> in
> >>>>> >> production with the code I linked to, and tresata apparently has
> >>>>> been as
> >>>>> >> well.  Again, for everyone saying this is impossible, have you
> read
> >>>>> either
> >>>>> >> of those implementations and looked at the approach?
> >>>>> >>
> >>>>> >>
> >>>>> >>
> >>>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
> >>>>> >> Sean.McNamara@webtrends.com> wrote:
> >>>>> >>
> >>>>> >>> Please feel free to correct me if I’m wrong, but I think the
> exactly
> >>>>> >>> once spark streaming semantics can easily be solved using
> >>>>> updateStateByKey.
> >>>>> >>> Make the key going into updateStateByKey be a hash of the event,
> or
> >>>>> pluck
> >>>>> >>> off some uuid from the message.  The updateFunc would only emit
> the
> >>>>> message
> >>>>> >>> if the key did not exist, and the user has complete control over
> >>>>> the window
> >>>>> >>> of time / state lifecycle for detecting duplicates.  It also
> makes
> >>>>> it
> >>>>> >>> really easy to detect and take action (alert?) when you DO see a
> >>>>> duplicate,
> >>>>> >>> or make memory tradeoffs within an error bound using a sketch
> >>>>> algorithm.
> >>>>> >>> The kafka simple consumer is insanely complex, if possible I
> think
> >>>>> it would
> >>>>> >>> be better (and vastly more flexible) to get reliability using the
> >>>>> >>> primitives that spark so elegantly provides.
> >>>>> >>>
> >>>>> >>> Cheers,
> >>>>> >>>
> >>>>> >>> Sean
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> >>>>> >>> hshreedharan@cloudera.com> wrote:
> >>>>> >>> >
> >>>>> >>> > Hi Dibyendu,
> >>>>> >>> >
> >>>>> >>> > Thanks for the details on the implementation. But I still do
> not
> >>>>> >>> believe
> >>>>> >>> > that it is no duplicates - what they achieve is that the same
> >>>>> batch is
> >>>>> >>> > processed exactly the same way every time (but see it may be
> >>>>> processed
> >>>>> >>> more
> >>>>> >>> > than once) - so it depends on the operation being idempotent. I
> >>>>> believe
> >>>>> >>> > Trident uses ZK to keep track of the transactions - a batch
> can be
> >>>>> >>> > processed multiple times in failure scenarios (for example, the
> >>>>> >>> transaction
> >>>>> >>> > is processed but before ZK is updated the machine fails,
> causing a
> >>>>> >>> "new"
> >>>>> >>> > node to process it again).
> >>>>> >>> >
> >>>>> >>> > I don't think it is impossible to do this in Spark Streaming as
> >>>>> well
> >>>>> >>> and
> >>>>> >>> > I'd be really interested in working on it at some point in the
> >>>>> near
> >>>>> >>> future.
> >>>>> >>> >
> >>>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> >>>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
> >>>>> >>> >
> >>>>> >>> >> Hi,
> >>>>> >>> >>
> >>>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident.
> The
> >>>>> Storm
> >>>>> >>> >> Trident has done the exact-once guarantee by processing the
> >>>>> tuple in a
> >>>>> >>> >> batch  and assigning same transaction-id for a given batch .
> The
> >>>>> >>> replay for
> >>>>> >>> >> a given batch with a transaction-id will have exact same set
> of
> >>>>> >>> tuples and
> >>>>> >>> >> replay of batches happen in exact same order before the
> failure.
> >>>>> >>> >>
> >>>>> >>> >> Having this paradigm, if downstream system process data for a
> >>>>> given
> >>>>> >>> batch
> >>>>> >>> >> for having a given transaction-id , and if during failure if
> same
> >>>>> >>> batch is
> >>>>> >>> >> again emitted , you can check if same transaction-id is
> already
> >>>>> >>> processed
> >>>>> >>> >> or not and hence can guarantee exact once semantics.
> >>>>> >>> >>
> >>>>> >>> >> And this can only be achieved in Spark if we use Low Level
> Kafka
> >>>>> >>> consumer
> >>>>> >>> >> API to process the offsets. This low level Kafka Consumer (
> >>>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has
> >>>>> implemented the
> >>>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of
> the
> >>>>> >>> Kafka
> >>>>> >>> >> related logic has been taken from Storm-Kafka spout and which
> >>>>> manages
> >>>>> >>> all
> >>>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> >>>>> >>> managements.
> >>>>> >>> >>
> >>>>> >>> >> Presently this Consumer maintains that during Receiver
> failure,
> >>>>> it
> >>>>> >>> will
> >>>>> >>> >> re-emit the exact same Block with same set of messages . Every
> >>>>> >>> message have
> >>>>> >>> >> the details of its partition, offset and topic related details
> >>>>> which
> >>>>> >>> can
> >>>>> >>> >> tackle the SPARK-3146.
> >>>>> >>> >>
> >>>>> >>> >> As this Low Level consumer has complete control over the Kafka
> >>>>> >>> Offsets ,
> >>>>> >>> >> we can implement Trident like feature on top of it like having
> >>>>> >>> implement a
> >>>>> >>> >> transaction-id for a given block , and re-emit the same block
> >>>>> with
> >>>>> >>> same set
> >>>>> >>> >> of message during Driver failure.
> >>>>> >>> >>
> >>>>> >>> >> Regards,
> >>>>> >>> >> Dibyendu
> >>>>> >>> >>
> >>>>> >>> >>
> >>>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
> >>>>> saisai.shao@intel.com>
> >>>>> >>> >> wrote:
> >>>>> >>> >>>
> >>>>> >>> >>> Hi all,
> >>>>> >>> >>>
> >>>>> >>> >>> I agree with Hari that Strong exact-once semantics is very
> hard
> >>>>> to
> >>>>> >>> >>> guarantee, especially in the failure situation. From my
> >>>>> >>> understanding even
> >>>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
> >>>>> >>> guarantee the
> >>>>> >>> >>> exact once semantics once failed, first is the ordering of
> data
> >>>>> >>> replaying
> >>>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
> >>>>> >>> partitions
> >>>>> >>> >>> are injected in; second is the design complexity of achieving
> >>>>> this,
> >>>>> >>> you can
> >>>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the
> >>>>> very
> >>>>> >>> details
> >>>>> >>> >>> of Kafka metadata management system to achieve this, not to
> say
> >>>>> >>> rebalance
> >>>>> >>> >>> and fault-tolerance.
> >>>>> >>> >>>
> >>>>> >>> >>> Thanks
> >>>>> >>> >>> Jerry
> >>>>> >>> >>>
> >>>>> >>> >>> -----Original Message-----
> >>>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:
> >>>>> langel.groups@gmail.com]
> >>>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>>>> >>> >>> To: Cody Koeninger
> >>>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>>>> >>> >>> Subject: Re: Which committers care about Kafka?
> >>>>> >>> >>>
> >>>>> >>> >>> But idempotency is not that easy t achieve sometimes. A
> strong
> >>>>> only
> >>>>> >>> once
> >>>>> >>> >>> semantic through a proper API would  be superuseful; but I'm
> not
> >>>>> >>> implying
> >>>>> >>> >>> this is easy to achieve.
> >>>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
> >>>>> wrote:
> >>>>> >>> >>>
> >>>>> >>> >>>> If the downstream store for the output data is idempotent or
> >>>>> >>> >>>> transactional, and that downstream store also is the system
> of
> >>>>> >>> record
> >>>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
> >>>>> Commit
> >>>>> >>> >>>> offsets with / after the data is stored.  On any failure,
> >>>>> restart
> >>>>> >>> from
> >>>>> >>> >>> the last committed offsets.
> >>>>> >>> >>>>
> >>>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
> >>>>> rather
> >>>>> >>> >>>> than near-realtime-analytics use cases.
> >>>>> >>> >>>>
> >>>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>>>> >>> >>>> hshreedharan@cloudera.com
> >>>>> >>> >>>>> wrote:
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> I get what you are saying. But getting exactly once right
> is
> >>>>> an
> >>>>> >>> >>>>> extremely hard problem - especially in presence of failure.
> >>>>> The
> >>>>> >>> >>>>> issue is failures
> >>>>> >>> >>>> can
> >>>>> >>> >>>>> happen in a bunch of places. For example, before the
> >>>>> notification
> >>>>> >>> of
> >>>>> >>> >>>>> downstream store being successful reaches the receiver that
> >>>>> updates
> >>>>> >>> >>>>> the offsets, the node fails. The store was successful, but
> >>>>> >>> >>>>> duplicates came in either way. This is something worth
> >>>>> discussing
> >>>>> >>> by
> >>>>> >>> >>>>> itself - but without uuids etc this might not really be
> >>>>> solved even
> >>>>> >>> >>> when you think it is.
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in
> all
> >>>>> of
> >>>>> >>> the
> >>>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and
> once-only
> >>>>> >>> >>>>> delivery,
> >>>>> >>> >>>> but
> >>>>> >>> >>>>> I doubt the latter is really possible to guarantee -
> though I
> >>>>> >>> really
> >>>>> >>> >>>> would
> >>>>> >>> >>>>> love to have that!
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> Thanks,
> >>>>> >>> >>>>> Hari
> >>>>> >>> >>>>>
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>>>> >>> >>>>> <co...@koeninger.org>
> >>>>> >>> >>>>> wrote:
> >>>>> >>> >>>>>
> >>>>> >>> >>>>>> Thanks for the replies.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.
> >>>>> If you
> >>>>> >>> >>>>>> actually want exactly-once semantics, you need control of
> >>>>> kafka
> >>>>> >>> >>>>>> offsets
> >>>>> >>> >>>> as
> >>>>> >>> >>>>>> well, including the ability to not use zookeeper as the
> >>>>> system of
> >>>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system
> that
> >>>>> has
> >>>>> >>> >>>>>> strong
> >>>>> >>> >>>> ordering
> >>>>> >>> >>>>>> guarantees (within a partition) and does not mandate the
> use
> >>>>> of
> >>>>> >>> >>>> zookeeper
> >>>>> >>> >>>>>> to store offsets.  I think there should be a spark api
> that
> >>>>> acts
> >>>>> >>> as
> >>>>> >>> >>>>>> a
> >>>>> >>> >>>> very
> >>>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
> >>>>> >>> >>>>>> downstream
> >>>>> >>> >>>> store.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> Take a look at the links I posted - if there's already
> been 2
> >>>>> >>> >>>> independent
> >>>>> >>> >>>>>> implementations of the idea, chances are it's something
> >>>>> people
> >>>>> >>> need.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> Hi Cody,
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can
> implement
> >>>>> >>> >>>>>>> something pretty simple and lightweight for that one.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation -
> >>>>> this is
> >>>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it
> is
> >>>>> a
> >>>>> >>> good
> >>>>> >>> >>>> idea to
> >>>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>>>> >>> >>>> optimization. For
> >>>>> >>> >>>>>>> that reason, we must be careful in implementation. There
> >>>>> are a
> >>>>> >>> >>>>>>> couple
> >>>>> >>> >>>> of
> >>>>> >>> >>>>>>> issues that we need to ensure works properly -
> specifically
> >>>>> >>> >>> ordering.
> >>>>> >>> >>>> To
> >>>>> >>> >>>>>>> ensure we pull messages from different topics and
> >>>>> partitions in
> >>>>> >>> >>>>>>> the
> >>>>> >>> >>>> same
> >>>>> >>> >>>>>>> order after failure, we’d still have to persist the
> >>>>> metadata to
> >>>>> >>> >>>>>>> HDFS
> >>>>> >>> >>>> (or
> >>>>> >>> >>>>>>> some other system) - this metadata must contain the
> order of
> >>>>> >>> >>>>>>> messages consumed, so we know how to re-read the
> messages.
> >>>>> I am
> >>>>> >>> >>>>>>> planning to
> >>>>> >>> >>>> explore
> >>>>> >>> >>>>>>> this once I have some time (probably in Jan). In
> addition,
> >>>>> we
> >>>>> >>> must
> >>>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
> >>>>> file a
> >>>>> >>> >>>>>>> placeholder jira for this one.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while
> >>>>> back -
> >>>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
> >>>>> that
> >>>>> >>> this
> >>>>> >>> >>>>>>> will get pulled in soon, as this is something I know
> people
> >>>>> want.
> >>>>> >>> >>>>>>> I am open
> >>>>> >>> >>>> to
> >>>>> >>> >>>>>>> feedback on that - anything that I can do to make it
> better.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> Thanks,
> >>>>> >>> >>>>>>> Hari
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>>>> >>> >>>>>>> <pw...@gmail.com>
> >>>>> >>> >>>>>>> wrote:
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>>> Hey Cody,
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming
> >>>>> is TD -
> >>>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit.
> >>>>> To the
> >>>>> >>> >>>>>>>> high level point of whether Kafka is important - it
> >>>>> definitely
> >>>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
> >>>>> for
> >>>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
> >>>>> library.
> >>>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
> >>>>> unstable
> >>>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
> >>>>> stabilize
> >>>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't
> be
> >>>>> able
> >>>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level
> >>>>> impression
> >>>>> >>> only
> >>>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so
> it's
> >>>>> worth
> >>>>> >>> >>> revisiting given the developments in Kafka.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> Please do bring things up like this on the dev list if
> >>>>> there are
> >>>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> - Patrick
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>>>> >>> >>>>>>>> <co...@koeninger.org>
> >>>>> >>> >>>>>>>> wrote:
> >>>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people
> to
> >>>>> get
> >>>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible
> >>>>> for our
> >>>>> >>> >>>>>>>> production
> >>>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint,
> because
> >>>>> I've
> >>>>> >>> >>>>>>>>> seen several different patches and libraries to fix the
> >>>>> same
> >>>>> >>> >>>>>>>>> things we've
> >>>>> >>> >>>>>>>> been
> >>>>> >>> >>>>>>>>> running into.
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Regarding flexibility
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an
> equivalent
> >>>>> of
> >>>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
> >>>>> >>> >>>>>>>>> ourselves, then found
> >>>>> >>> >>>>>>>> that
> >>>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't
> be
> >>>>> able
> >>>>> >>> >>>>>>>>> to
> >>>>> >>> >>>> get
> >>>>> >>> >>>>>>>> our
> >>>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a
> >>>>> whole
> >>>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
> >>>>> arbitrary
> >>>>> >>> >>>>>>>>> delay of
> >>>>> >>> >>>>>>>> messages, etc).
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>>>> >>> >>>>>>>>> WriteAheadLog
> >>>>> >>> >>>> as
> >>>>> >>> >>>>>>>> a
> >>>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but
> >>>>> Kafka
> >>>>> >>> >>>>>>>>> already
> >>>>> >>> >>>>>>>> is a
> >>>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
> >>>>> treats
> >>>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
> >>>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>>>> >>> >>>>>>>>> write-ahead log in hdfs, I
> >>>>> >>> >>>> need
> >>>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
> >>>>> >>> >>>>>>>>> failures
> >>>>> >>> >>>> that
> >>>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
> >>>>> >>> instance).
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> I've got an implementation i've been using
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>>
> >>>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Tresata has something similar at
> >>>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm
> >>>>> code.
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
> >>>>> rather
> >>>>> >>> >>>>>>>>> than
> >>>>> >>> >>>>>>>> patches
> >>>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
> >>>>> >>> >>>> implementation
> >>>>> >>> >>>>>>>> are
> >>>>> >>> >>>>>>>>> private[spark].
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to
> >>>>> get.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>>
> >>>>> >>> -----------------------------------------------------------------
> >>>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
> >>>>> dev-unsubscribe@spark.apache.org
> >>>>> >>> For
> >>>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>
> >>>>> >>> >>>>
> >>>>> >>> >>>
> >>>>> >>> >>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>
> >>>>> >
> >>>>>
> >>>>
> >>>
> >>
>
>
>

RE: Which committers care about Kafka?

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Cody,

From my understanding rate control is an optional configuration in Spark Streaming and is disabled by default, so user can reach maximum throughput without any configuration.

The reason why rate control is so important in streaming processing is that Spark Streaming and other streaming frameworks are easily prone to unexpected behavior and failure situation due to network boost and other un-controllable inject rate.

Especially for Spark Streaming,  the large amount of processed data will delay the processing time, which will further delay the ongoing job, and finally lead to failure.

Thanks
Jerry

From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: Tuesday, December 30, 2014 6:50 AM
To: Tathagata Das
Cc: Hari Shreedharan; Shao, Saisai; Sean McNamara; Patrick Wendell; Luis Ángel Vicente Sánchez; Dibyendu Bhattacharya; dev@spark.apache.org; Koert Kuipers
Subject: Re: Which committers care about Kafka?

Can you give a little more clarification on exactly what is meant by

1. Data rate control

If someone wants to clamp the maximum number of messages per RDD partition in my solution, it would be very straightforward to do so.

Regarding the holy grail, I'm pretty certain you can't have end-to-end transactional semantics without the client code being in charge of offset state.  That means the client code is going to also need to be in charge of setting up an initial state for updateStateByKey that makes sense; as long as they can do that, the job should be safe to restart from arbitrary failures.

On Mon, Dec 29, 2014 at 4:33 PM, Tathagata Das <ta...@gmail.com>> wrote:
Hey all,

Some wrap up thoughts on this thread.

Let me first reiterate what Patrick said, that Kafka is super super
important as it forms the largest fraction of Spark Streaming user
base. So we really want to improve the Kafka + Spark Streaming
integration. To this end, some of the things that needs to be
considered can be broadly classified into the following to sort
facilitate the discussion.

1. Data rate control
2. Receiver failure semantics - partially achieving this gives
at-least once, completely achieving this gives exactly-once
3. Driver failure semantics - partially achieving this gives at-least
once, completely achieving this gives exactly-once

Here is a run down of what is achieved by different implementations
(based on what I think).

1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
handle 1 partially (some duplicate data), and could NOT handle 2 (all
previously received data lost).

2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
can handle 3, can almost completely handle 1 and 2 (except few corner
cases which prevents it from completely guaranteeing exactly-once).

3. I believe Dibyendu's solution (correct me if i am wrong) can handle
1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
completely solved by extending the solution further.

4. Cody's solution (again, correct me if I am wrong) does not use
receivers at all (so eliminates 2). It can handle 3 completely for
simple operations like map and filter, but not sure if it works
completely for stateful ops like windows and updateStateByKey. Also it
does not handle 1.

The real challenge for Kafka is in achieving 3 completely for stateful
operations while also handling 1.  (i.e., use receivers, but still get
driver failure guarantees). Solving this will give us our holy grail
solution, and this is what I want to achieve.

On that note, Cody submitted a PR on his style of achieving
exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
am reviewing it. Please follow the PR if you are interested.

TD

On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger <co...@koeninger.org>> wrote:
> The conversation was mostly getting TD up to speed on this thread since he
> had just gotten back from his trip and hadn't seen it.
>
> The jira has a summary of the requirements we discussed, I'm sure TD or
> Patrick can add to the ticket if I missed something.
> On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hs...@cloudera.com>>
> wrote:
>
>> In general such discussions happen or is posted on the dev lists. Could
>> you please post a summary? Thanks.
>>
>> Thanks,
>> Hari
>>
>>
>> On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>>
>> wrote:
>>
>>>  After a long talk with Patrick and TD (thanks guys), I opened the
>>> following jira
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4964
>>>
>>> Sample PR has an impementation for the batch and the dstream case, and a
>>> link to a project with example usage.
>>>
>>> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com>> wrote:
>>>
>>>> yup, we at tresata do the idempotent store the same way. very simple
>>>> approach.
>>>>
>>>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>>
>>>> wrote:
>>>>>
>>>>> That KafkaRDD code is dead simple.
>>>>>
>>>>> Given a user specified map
>>>>>
>>>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>>>> ...
>>>>> turn each one of those entries into a partition of an rdd, using the
>>>>> simple
>>>>> consumer.
>>>>> That's it.  No recovery logic, no state, nothing - for any failures,
>>>>> bail
>>>>> on the rdd and let it retry.
>>>>> Spark stays out of the business of being a distributed database.
>>>>>
>>>>> The client code does any transformation it wants, then stores the data
>>>>> and
>>>>> offsets.  There are two ways of doing this, either based on idempotence
>>>>> or
>>>>> a transactional data store.
>>>>>
>>>>> For idempotent stores:
>>>>>
>>>>> 1.manipulate data
>>>>> 2.save data to store
>>>>> 3.save ending offsets to the same store
>>>>>
>>>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>>>> again at the same beginning offsets, do the same calculations in the
>>>>> same
>>>>> order, overwrite the same data, all is good.
>>>>>
>>>>>
>>>>> For transactional stores:
>>>>>
>>>>> 1. manipulate data
>>>>> 2. begin transaction
>>>>> 3. save data to the store
>>>>> 4. save offsets
>>>>> 5. commit transaction
>>>>>
>>>>> If you fail before 5, the transaction rolls back.  To make this less
>>>>> heavyweight, you can write the data outside the transaction and then
>>>>> update
>>>>> a pointer to the current data inside the transaction.
>>>>>
>>>>>
>>>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>>>> fact, the current streaming api actively impedes my ability to do the
>>>>> above.  I'm just suggesting providing an api that doesn't get in the
>>>>> way of
>>>>> exactly-once.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>>>> hshreedharan@cloudera.com<ma...@cloudera.com>
>>>>> > wrote:
>>>>>
>>>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>>>> > quite a bit of very Kafka-specific code, that would take more time to
>>>>> read
>>>>> > than I can currently afford? If you can explain your algorithm a bit,
>>>>> it
>>>>> > might help.
>>>>> >
>>>>> > Thanks,
>>>>> > Hari
>>>>> >
>>>>> >
>>>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>>
>>>>> > wrote:
>>>>> >
>>>>> >>
>>>>> >> The problems you guys are discussing come from trying to store state
>>>>> in
>>>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>>>> >>
>>>>> >> Just map kafka partitions directly to rdds, llet user code specify
>>>>> the
>>>>> >> range of offsets explicitly, and let them be in charge of committing
>>>>> >> offsets.
>>>>> >>
>>>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>>>> >> production with the code I linked to, and tresata apparently has
>>>>> been as
>>>>> >> well.  Again, for everyone saying this is impossible, have you read
>>>>> either
>>>>> >> of those implementations and looked at the approach?
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>>>> >> Sean.McNamara@webtrends.com<ma...@webtrends.com>> wrote:
>>>>> >>
>>>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>>>> >>> once spark streaming semantics can easily be solved using
>>>>> updateStateByKey.
>>>>> >>> Make the key going into updateStateByKey be a hash of the event, or
>>>>> pluck
>>>>> >>> off some uuid from the message.  The updateFunc would only emit the
>>>>> message
>>>>> >>> if the key did not exist, and the user has complete control over
>>>>> the window
>>>>> >>> of time / state lifecycle for detecting duplicates.  It also makes
>>>>> it
>>>>> >>> really easy to detect and take action (alert?) when you DO see a
>>>>> duplicate,
>>>>> >>> or make memory tradeoffs within an error bound using a sketch
>>>>> algorithm.
>>>>> >>> The kafka simple consumer is insanely complex, if possible I think
>>>>> it would
>>>>> >>> be better (and vastly more flexible) to get reliability using the
>>>>> >>> primitives that spark so elegantly provides.
>>>>> >>>
>>>>> >>> Cheers,
>>>>> >>>
>>>>> >>> Sean
>>>>> >>>
>>>>> >>>
>>>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>>>> >>> hshreedharan@cloudera.com<ma...@cloudera.com>> wrote:
>>>>> >>> >
>>>>> >>> > Hi Dibyendu,
>>>>> >>> >
>>>>> >>> > Thanks for the details on the implementation. But I still do not
>>>>> >>> believe
>>>>> >>> > that it is no duplicates - what they achieve is that the same
>>>>> batch is
>>>>> >>> > processed exactly the same way every time (but see it may be
>>>>> processed
>>>>> >>> more
>>>>> >>> > than once) - so it depends on the operation being idempotent. I
>>>>> believe
>>>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>>>>> >>> > processed multiple times in failure scenarios (for example, the
>>>>> >>> transaction
>>>>> >>> > is processed but before ZK is updated the machine fails, causing a
>>>>> >>> "new"
>>>>> >>> > node to process it again).
>>>>> >>> >
>>>>> >>> > I don't think it is impossible to do this in Spark Streaming as
>>>>> well
>>>>> >>> and
>>>>> >>> > I'd be really interested in working on it at some point in the
>>>>> near
>>>>> >>> future.
>>>>> >>> >
>>>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>>>> >>> > dibyendu.bhattachary@gmail.com<ma...@gmail.com>> wrote:
>>>>> >>> >
>>>>> >>> >> Hi,
>>>>> >>> >>
>>>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>>>>> Storm
>>>>> >>> >> Trident has done the exact-once guarantee by processing the
>>>>> tuple in a
>>>>> >>> >> batch  and assigning same transaction-id for a given batch . The
>>>>> >>> replay for
>>>>> >>> >> a given batch with a transaction-id will have exact same set of
>>>>> >>> tuples and
>>>>> >>> >> replay of batches happen in exact same order before the failure.
>>>>> >>> >>
>>>>> >>> >> Having this paradigm, if downstream system process data for a
>>>>> given
>>>>> >>> batch
>>>>> >>> >> for having a given transaction-id , and if during failure if same
>>>>> >>> batch is
>>>>> >>> >> again emitted , you can check if same transaction-id is already
>>>>> >>> processed
>>>>> >>> >> or not and hence can guarantee exact once semantics.
>>>>> >>> >>
>>>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>>>> >>> consumer
>>>>> >>> >> API to process the offsets. This low level Kafka Consumer (
>>>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has
>>>>> implemented the
>>>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>>>> >>> Kafka
>>>>> >>> >> related logic has been taken from Storm-Kafka spout and which
>>>>> manages
>>>>> >>> all
>>>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>>>> >>> managements.
>>>>> >>> >>
>>>>> >>> >> Presently this Consumer maintains that during Receiver failure,
>>>>> it
>>>>> >>> will
>>>>> >>> >> re-emit the exact same Block with same set of messages . Every
>>>>> >>> message have
>>>>> >>> >> the details of its partition, offset and topic related details
>>>>> which
>>>>> >>> can
>>>>> >>> >> tackle the SPARK-3146.
>>>>> >>> >>
>>>>> >>> >> As this Low Level consumer has complete control over the Kafka
>>>>> >>> Offsets ,
>>>>> >>> >> we can implement Trident like feature on top of it like having
>>>>> >>> implement a
>>>>> >>> >> transaction-id for a given block , and re-emit the same block
>>>>> with
>>>>> >>> same set
>>>>> >>> >> of message during Driver failure.
>>>>> >>> >>
>>>>> >>> >> Regards,
>>>>> >>> >> Dibyendu
>>>>> >>> >>
>>>>> >>> >>
>>>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>>>> saisai.shao@intel.com<ma...@intel.com>>
>>>>> >>> >> wrote:
>>>>> >>> >>>
>>>>> >>> >>> Hi all,
>>>>> >>> >>>
>>>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard
>>>>> to
>>>>> >>> >>> guarantee, especially in the failure situation. From my
>>>>> >>> understanding even
>>>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>>>> >>> guarantee the
>>>>> >>> >>> exact once semantics once failed, first is the ordering of data
>>>>> >>> replaying
>>>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>>>>> >>> partitions
>>>>> >>> >>> are injected in; second is the design complexity of achieving
>>>>> this,
>>>>> >>> you can
>>>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the
>>>>> very
>>>>> >>> details
>>>>> >>> >>> of Kafka metadata management system to achieve this, not to say
>>>>> >>> rebalance
>>>>> >>> >>> and fault-tolerance.
>>>>> >>> >>>
>>>>> >>> >>> Thanks
>>>>> >>> >>> Jerry
>>>>> >>> >>>
>>>>> >>> >>> -----Original Message-----
>>>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:
>>>>> langel.groups@gmail.com<ma...@gmail.com>]
>>>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>>>> >>> >>> To: Cody Koeninger
>>>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org<ma...@spark.apache.org>
>>>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>>>> >>> >>>
>>>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>>>>> only
>>>>> >>> once
>>>>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>>>>> >>> implying
>>>>> >>> >>> this is easy to achieve.
>>>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>>
>>>>> wrote:
>>>>> >>> >>>
>>>>> >>> >>>> If the downstream store for the output data is idempotent or
>>>>> >>> >>>> transactional, and that downstream store also is the system of
>>>>> >>> record
>>>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
>>>>> Commit
>>>>> >>> >>>> offsets with / after the data is stored.  On any failure,
>>>>> restart
>>>>> >>> from
>>>>> >>> >>> the last committed offsets.
>>>>> >>> >>>>
>>>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>>>>> rather
>>>>> >>> >>>> than near-realtime-analytics use cases.
>>>>> >>> >>>>
>>>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>>>> >>> >>>> hshreedharan@cloudera.com<ma...@cloudera.com>
>>>>> >>> >>>>> wrote:
>>>>> >>> >>>>>
>>>>> >>> >>>>> I get what you are saying. But getting exactly once right is
>>>>> an
>>>>> >>> >>>>> extremely hard problem - especially in presence of failure.
>>>>> The
>>>>> >>> >>>>> issue is failures
>>>>> >>> >>>> can
>>>>> >>> >>>>> happen in a bunch of places. For example, before the
>>>>> notification
>>>>> >>> of
>>>>> >>> >>>>> downstream store being successful reaches the receiver that
>>>>> updates
>>>>> >>> >>>>> the offsets, the node fails. The store was successful, but
>>>>> >>> >>>>> duplicates came in either way. This is something worth
>>>>> discussing
>>>>> >>> by
>>>>> >>> >>>>> itself - but without uuids etc this might not really be
>>>>> solved even
>>>>> >>> >>> when you think it is.
>>>>> >>> >>>>>
>>>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all
>>>>> of
>>>>> >>> the
>>>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>>>> >>> >>>>> delivery,
>>>>> >>> >>>> but
>>>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>>>>> >>> really
>>>>> >>> >>>> would
>>>>> >>> >>>>> love to have that!
>>>>> >>> >>>>>
>>>>> >>> >>>>> Thanks,
>>>>> >>> >>>>> Hari
>>>>> >>> >>>>>
>>>>> >>> >>>>>
>>>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>>>> >>> >>>>> <co...@koeninger.org>>
>>>>> >>> >>>>> wrote:
>>>>> >>> >>>>>
>>>>> >>> >>>>>> Thanks for the replies.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.
>>>>> If you
>>>>> >>> >>>>>> actually want exactly-once semantics, you need control of
>>>>> kafka
>>>>> >>> >>>>>> offsets
>>>>> >>> >>>> as
>>>>> >>> >>>>>> well, including the ability to not use zookeeper as the
>>>>> system of
>>>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>>>>> has
>>>>> >>> >>>>>> strong
>>>>> >>> >>>> ordering
>>>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use
>>>>> of
>>>>> >>> >>>> zookeeper
>>>>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>>>>> acts
>>>>> >>> as
>>>>> >>> >>>>>> a
>>>>> >>> >>>> very
>>>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>>>>> >>> >>>>>> downstream
>>>>> >>> >>>> store.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>>>>> >>> >>>> independent
>>>>> >>> >>>>>> implementations of the idea, chances are it's something
>>>>> people
>>>>> >>> need.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>>>> >>> >>>>>> hshreedharan@cloudera.com<ma...@cloudera.com>> wrote:
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> Hi Cody,
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>>>> >>> >>>>>>> something pretty simple and lightweight for that one.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation -
>>>>> this is
>>>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is
>>>>> a
>>>>> >>> good
>>>>> >>> >>>> idea to
>>>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>>>> >>> >>>> optimization. For
>>>>> >>> >>>>>>> that reason, we must be careful in implementation. There
>>>>> are a
>>>>> >>> >>>>>>> couple
>>>>> >>> >>>> of
>>>>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>>>>> >>> >>> ordering.
>>>>> >>> >>>> To
>>>>> >>> >>>>>>> ensure we pull messages from different topics and
>>>>> partitions in
>>>>> >>> >>>>>>> the
>>>>> >>> >>>> same
>>>>> >>> >>>>>>> order after failure, we’d still have to persist the
>>>>> metadata to
>>>>> >>> >>>>>>> HDFS
>>>>> >>> >>>> (or
>>>>> >>> >>>>>>> some other system) - this metadata must contain the order of
>>>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages.
>>>>> I am
>>>>> >>> >>>>>>> planning to
>>>>> >>> >>>> explore
>>>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition,
>>>>> we
>>>>> >>> must
>>>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>>>>> file a
>>>>> >>> >>>>>>> placeholder jira for this one.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while
>>>>> back -
>>>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
>>>>> that
>>>>> >>> this
>>>>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>>>>> want.
>>>>> >>> >>>>>>> I am open
>>>>> >>> >>>> to
>>>>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> Thanks,
>>>>> >>> >>>>>>> Hari
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>>>> >>> >>>>>>> <pw...@gmail.com>>
>>>>> >>> >>>>>>> wrote:
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>>> Hey Cody,
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming
>>>>> is TD -
>>>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit.
>>>>> To the
>>>>> >>> >>>>>>>> high level point of whether Kafka is important - it
>>>>> definitely
>>>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
>>>>> for
>>>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>>>>> library.
>>>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>>>>> unstable
>>>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>>>>> stabilize
>>>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>>>>> able
>>>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level
>>>>> impression
>>>>> >>> only
>>>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>>>>> worth
>>>>> >>> >>> revisiting given the developments in Kafka.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> Please do bring things up like this on the dev list if
>>>>> there are
>>>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> - Patrick
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>>>> >>> >>>>>>>> <co...@koeninger.org>>
>>>>> >>> >>>>>>>> wrote:
>>>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to
>>>>> get
>>>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible
>>>>> for our
>>>>> >>> >>>>>>>> production
>>>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>>>>> I've
>>>>> >>> >>>>>>>>> seen several different patches and libraries to fix the
>>>>> same
>>>>> >>> >>>>>>>>> things we've
>>>>> >>> >>>>>>>> been
>>>>> >>> >>>>>>>>> running into.
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Regarding flexibility
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent
>>>>> of
>>>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>>>> >>> >>>>>>>>> ourselves, then found
>>>>> >>> >>>>>>>> that
>>>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>>>>> able
>>>>> >>> >>>>>>>>> to
>>>>> >>> >>>> get
>>>>> >>> >>>>>>>> our
>>>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a
>>>>> whole
>>>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
>>>>> arbitrary
>>>>> >>> >>>>>>>>> delay of
>>>>> >>> >>>>>>>> messages, etc).
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>>>> >>> >>>>>>>>> WriteAheadLog
>>>>> >>> >>>> as
>>>>> >>> >>>>>>>> a
>>>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but
>>>>> Kafka
>>>>> >>> >>>>>>>>> already
>>>>> >>> >>>>>>>> is a
>>>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>>>>> treats
>>>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>>>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>>>> >>> >>>> need
>>>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>>>> >>> >>>>>>>>> failures
>>>>> >>> >>>> that
>>>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>>>> >>> instance).
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> I've got an implementation i've been using
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>>
>>>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Tresata has something similar at
>>>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm
>>>>> code.
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>>>>> rather
>>>>> >>> >>>>>>>>> than
>>>>> >>> >>>>>>>> patches
>>>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>>>>> >>> >>>> implementation
>>>>> >>> >>>>>>>> are
>>>>> >>> >>>>>>>>> private[spark].
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to
>>>>> get.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>>
>>>>> >>> -----------------------------------------------------------------
>>>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>>>> dev-unsubscribe@spark.apache.org<ma...@spark.apache.org>
>>>>> >>> For
>>>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org<ma...@spark.apache.org>
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>
>>>>> >>> >>>>
>>>>> >>> >>>
>>>>> >>> >>
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>
>>


Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
Can you give a little more clarification on exactly what is meant by

1. Data rate control

If someone wants to clamp the maximum number of messages per RDD partition
in my solution, it would be very straightforward to do so.

Regarding the holy grail, I'm pretty certain you can't have end-to-end
transactional semantics without the client code being in charge of offset
state.  That means the client code is going to also need to be in charge of
setting up an initial state for updateStateByKey that makes sense; as long
as they can do that, the job should be safe to restart from arbitrary
failures.

On Mon, Dec 29, 2014 at 4:33 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Hey all,
>
> Some wrap up thoughts on this thread.
>
> Let me first reiterate what Patrick said, that Kafka is super super
> important as it forms the largest fraction of Spark Streaming user
> base. So we really want to improve the Kafka + Spark Streaming
> integration. To this end, some of the things that needs to be
> considered can be broadly classified into the following to sort
> facilitate the discussion.
>
> 1. Data rate control
> 2. Receiver failure semantics - partially achieving this gives
> at-least once, completely achieving this gives exactly-once
> 3. Driver failure semantics - partially achieving this gives at-least
> once, completely achieving this gives exactly-once
>
> Here is a run down of what is achieved by different implementations
> (based on what I think).
>
> 1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
> handle 1 partially (some duplicate data), and could NOT handle 2 (all
> previously received data lost).
>
> 2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
> can handle 3, can almost completely handle 1 and 2 (except few corner
> cases which prevents it from completely guaranteeing exactly-once).
>
> 3. I believe Dibyendu's solution (correct me if i am wrong) can handle
> 1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
> completely solved by extending the solution further.
>
> 4. Cody's solution (again, correct me if I am wrong) does not use
> receivers at all (so eliminates 2). It can handle 3 completely for
> simple operations like map and filter, but not sure if it works
> completely for stateful ops like windows and updateStateByKey. Also it
> does not handle 1.
>
> The real challenge for Kafka is in achieving 3 completely for stateful
> operations while also handling 1.  (i.e., use receivers, but still get
> driver failure guarantees). Solving this will give us our holy grail
> solution, and this is what I want to achieve.
>
> On that note, Cody submitted a PR on his style of achieving
> exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
> am reviewing it. Please follow the PR if you are interested.
>
> TD
>
> On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> > The conversation was mostly getting TD up to speed on this thread since
> he
> > had just gotten back from his trip and hadn't seen it.
> >
> > The jira has a summary of the requirements we discussed, I'm sure TD or
> > Patrick can add to the ticket if I missed something.
> > On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hs...@cloudera.com>
> > wrote:
> >
> >> In general such discussions happen or is posted on the dev lists. Could
> >> you please post a summary? Thanks.
> >>
> >> Thanks,
> >> Hari
> >>
> >>
> >> On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>
> >> wrote:
> >>
> >>>  After a long talk with Patrick and TD (thanks guys), I opened the
> >>> following jira
> >>>
> >>> https://issues.apache.org/jira/browse/SPARK-4964
> >>>
> >>> Sample PR has an impementation for the batch and the dstream case, and
> a
> >>> link to a project with example usage.
> >>>
> >>> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com>
> wrote:
> >>>
> >>>> yup, we at tresata do the idempotent store the same way. very simple
> >>>> approach.
> >>>>
> >>>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
> >>>> wrote:
> >>>>>
> >>>>> That KafkaRDD code is dead simple.
> >>>>>
> >>>>> Given a user specified map
> >>>>>
> >>>>> (topic1, partition0) -> (startingOffset, endingOffset)
> >>>>> (topic1, partition1) -> (startingOffset, endingOffset)
> >>>>> ...
> >>>>> turn each one of those entries into a partition of an rdd, using the
> >>>>> simple
> >>>>> consumer.
> >>>>> That's it.  No recovery logic, no state, nothing - for any failures,
> >>>>> bail
> >>>>> on the rdd and let it retry.
> >>>>> Spark stays out of the business of being a distributed database.
> >>>>>
> >>>>> The client code does any transformation it wants, then stores the
> data
> >>>>> and
> >>>>> offsets.  There are two ways of doing this, either based on
> idempotence
> >>>>> or
> >>>>> a transactional data store.
> >>>>>
> >>>>> For idempotent stores:
> >>>>>
> >>>>> 1.manipulate data
> >>>>> 2.save data to store
> >>>>> 3.save ending offsets to the same store
> >>>>>
> >>>>> If you fail between 2 and 3, the offsets haven't been stored, you
> start
> >>>>> again at the same beginning offsets, do the same calculations in the
> >>>>> same
> >>>>> order, overwrite the same data, all is good.
> >>>>>
> >>>>>
> >>>>> For transactional stores:
> >>>>>
> >>>>> 1. manipulate data
> >>>>> 2. begin transaction
> >>>>> 3. save data to the store
> >>>>> 4. save offsets
> >>>>> 5. commit transaction
> >>>>>
> >>>>> If you fail before 5, the transaction rolls back.  To make this less
> >>>>> heavyweight, you can write the data outside the transaction and then
> >>>>> update
> >>>>> a pointer to the current data inside the transaction.
> >>>>>
> >>>>>
> >>>>> Again, spark has nothing much to do with guaranteeing exactly once.
> In
> >>>>> fact, the current streaming api actively impedes my ability to do the
> >>>>> above.  I'm just suggesting providing an api that doesn't get in the
> >>>>> way of
> >>>>> exactly-once.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
> >>>>> hshreedharan@cloudera.com
> >>>>> > wrote:
> >>>>>
> >>>>> > Can you explain your basic algorithm for the once-only-delivery?
> It is
> >>>>> > quite a bit of very Kafka-specific code, that would take more time
> to
> >>>>> read
> >>>>> > than I can currently afford? If you can explain your algorithm a
> bit,
> >>>>> it
> >>>>> > might help.
> >>>>> >
> >>>>> > Thanks,
> >>>>> > Hari
> >>>>> >
> >>>>> >
> >>>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <
> cody@koeninger.org>
> >>>>> > wrote:
> >>>>> >
> >>>>> >>
> >>>>> >> The problems you guys are discussing come from trying to store
> state
> >>>>> in
> >>>>> >> spark, so don't do that.  Spark isn't a distributed database.
> >>>>> >>
> >>>>> >> Just map kafka partitions directly to rdds, llet user code specify
> >>>>> the
> >>>>> >> range of offsets explicitly, and let them be in charge of
> committing
> >>>>> >> offsets.
> >>>>> >>
> >>>>> >> Using the simple consumer isn't that bad, I'm already using this
> in
> >>>>> >> production with the code I linked to, and tresata apparently has
> >>>>> been as
> >>>>> >> well.  Again, for everyone saying this is impossible, have you
> read
> >>>>> either
> >>>>> >> of those implementations and looked at the approach?
> >>>>> >>
> >>>>> >>
> >>>>> >>
> >>>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
> >>>>> >> Sean.McNamara@webtrends.com> wrote:
> >>>>> >>
> >>>>> >>> Please feel free to correct me if I’m wrong, but I think the
> exactly
> >>>>> >>> once spark streaming semantics can easily be solved using
> >>>>> updateStateByKey.
> >>>>> >>> Make the key going into updateStateByKey be a hash of the event,
> or
> >>>>> pluck
> >>>>> >>> off some uuid from the message.  The updateFunc would only emit
> the
> >>>>> message
> >>>>> >>> if the key did not exist, and the user has complete control over
> >>>>> the window
> >>>>> >>> of time / state lifecycle for detecting duplicates.  It also
> makes
> >>>>> it
> >>>>> >>> really easy to detect and take action (alert?) when you DO see a
> >>>>> duplicate,
> >>>>> >>> or make memory tradeoffs within an error bound using a sketch
> >>>>> algorithm.
> >>>>> >>> The kafka simple consumer is insanely complex, if possible I
> think
> >>>>> it would
> >>>>> >>> be better (and vastly more flexible) to get reliability using the
> >>>>> >>> primitives that spark so elegantly provides.
> >>>>> >>>
> >>>>> >>> Cheers,
> >>>>> >>>
> >>>>> >>> Sean
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> >>>>> >>> hshreedharan@cloudera.com> wrote:
> >>>>> >>> >
> >>>>> >>> > Hi Dibyendu,
> >>>>> >>> >
> >>>>> >>> > Thanks for the details on the implementation. But I still do
> not
> >>>>> >>> believe
> >>>>> >>> > that it is no duplicates - what they achieve is that the same
> >>>>> batch is
> >>>>> >>> > processed exactly the same way every time (but see it may be
> >>>>> processed
> >>>>> >>> more
> >>>>> >>> > than once) - so it depends on the operation being idempotent. I
> >>>>> believe
> >>>>> >>> > Trident uses ZK to keep track of the transactions - a batch
> can be
> >>>>> >>> > processed multiple times in failure scenarios (for example, the
> >>>>> >>> transaction
> >>>>> >>> > is processed but before ZK is updated the machine fails,
> causing a
> >>>>> >>> "new"
> >>>>> >>> > node to process it again).
> >>>>> >>> >
> >>>>> >>> > I don't think it is impossible to do this in Spark Streaming as
> >>>>> well
> >>>>> >>> and
> >>>>> >>> > I'd be really interested in working on it at some point in the
> >>>>> near
> >>>>> >>> future.
> >>>>> >>> >
> >>>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> >>>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
> >>>>> >>> >
> >>>>> >>> >> Hi,
> >>>>> >>> >>
> >>>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident.
> The
> >>>>> Storm
> >>>>> >>> >> Trident has done the exact-once guarantee by processing the
> >>>>> tuple in a
> >>>>> >>> >> batch  and assigning same transaction-id for a given batch .
> The
> >>>>> >>> replay for
> >>>>> >>> >> a given batch with a transaction-id will have exact same set
> of
> >>>>> >>> tuples and
> >>>>> >>> >> replay of batches happen in exact same order before the
> failure.
> >>>>> >>> >>
> >>>>> >>> >> Having this paradigm, if downstream system process data for a
> >>>>> given
> >>>>> >>> batch
> >>>>> >>> >> for having a given transaction-id , and if during failure if
> same
> >>>>> >>> batch is
> >>>>> >>> >> again emitted , you can check if same transaction-id is
> already
> >>>>> >>> processed
> >>>>> >>> >> or not and hence can guarantee exact once semantics.
> >>>>> >>> >>
> >>>>> >>> >> And this can only be achieved in Spark if we use Low Level
> Kafka
> >>>>> >>> consumer
> >>>>> >>> >> API to process the offsets. This low level Kafka Consumer (
> >>>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has
> >>>>> implemented the
> >>>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of
> the
> >>>>> >>> Kafka
> >>>>> >>> >> related logic has been taken from Storm-Kafka spout and which
> >>>>> manages
> >>>>> >>> all
> >>>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> >>>>> >>> managements.
> >>>>> >>> >>
> >>>>> >>> >> Presently this Consumer maintains that during Receiver
> failure,
> >>>>> it
> >>>>> >>> will
> >>>>> >>> >> re-emit the exact same Block with same set of messages . Every
> >>>>> >>> message have
> >>>>> >>> >> the details of its partition, offset and topic related details
> >>>>> which
> >>>>> >>> can
> >>>>> >>> >> tackle the SPARK-3146.
> >>>>> >>> >>
> >>>>> >>> >> As this Low Level consumer has complete control over the Kafka
> >>>>> >>> Offsets ,
> >>>>> >>> >> we can implement Trident like feature on top of it like having
> >>>>> >>> implement a
> >>>>> >>> >> transaction-id for a given block , and re-emit the same block
> >>>>> with
> >>>>> >>> same set
> >>>>> >>> >> of message during Driver failure.
> >>>>> >>> >>
> >>>>> >>> >> Regards,
> >>>>> >>> >> Dibyendu
> >>>>> >>> >>
> >>>>> >>> >>
> >>>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
> >>>>> saisai.shao@intel.com>
> >>>>> >>> >> wrote:
> >>>>> >>> >>>
> >>>>> >>> >>> Hi all,
> >>>>> >>> >>>
> >>>>> >>> >>> I agree with Hari that Strong exact-once semantics is very
> hard
> >>>>> to
> >>>>> >>> >>> guarantee, especially in the failure situation. From my
> >>>>> >>> understanding even
> >>>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
> >>>>> >>> guarantee the
> >>>>> >>> >>> exact once semantics once failed, first is the ordering of
> data
> >>>>> >>> replaying
> >>>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
> >>>>> >>> partitions
> >>>>> >>> >>> are injected in; second is the design complexity of achieving
> >>>>> this,
> >>>>> >>> you can
> >>>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the
> >>>>> very
> >>>>> >>> details
> >>>>> >>> >>> of Kafka metadata management system to achieve this, not to
> say
> >>>>> >>> rebalance
> >>>>> >>> >>> and fault-tolerance.
> >>>>> >>> >>>
> >>>>> >>> >>> Thanks
> >>>>> >>> >>> Jerry
> >>>>> >>> >>>
> >>>>> >>> >>> -----Original Message-----
> >>>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:
> >>>>> langel.groups@gmail.com]
> >>>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>>>> >>> >>> To: Cody Koeninger
> >>>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>>>> >>> >>> Subject: Re: Which committers care about Kafka?
> >>>>> >>> >>>
> >>>>> >>> >>> But idempotency is not that easy t achieve sometimes. A
> strong
> >>>>> only
> >>>>> >>> once
> >>>>> >>> >>> semantic through a proper API would  be superuseful; but I'm
> not
> >>>>> >>> implying
> >>>>> >>> >>> this is easy to achieve.
> >>>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
> >>>>> wrote:
> >>>>> >>> >>>
> >>>>> >>> >>>> If the downstream store for the output data is idempotent or
> >>>>> >>> >>>> transactional, and that downstream store also is the system
> of
> >>>>> >>> record
> >>>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
> >>>>> Commit
> >>>>> >>> >>>> offsets with / after the data is stored.  On any failure,
> >>>>> restart
> >>>>> >>> from
> >>>>> >>> >>> the last committed offsets.
> >>>>> >>> >>>>
> >>>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
> >>>>> rather
> >>>>> >>> >>>> than near-realtime-analytics use cases.
> >>>>> >>> >>>>
> >>>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>>>> >>> >>>> hshreedharan@cloudera.com
> >>>>> >>> >>>>> wrote:
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> I get what you are saying. But getting exactly once right
> is
> >>>>> an
> >>>>> >>> >>>>> extremely hard problem - especially in presence of failure.
> >>>>> The
> >>>>> >>> >>>>> issue is failures
> >>>>> >>> >>>> can
> >>>>> >>> >>>>> happen in a bunch of places. For example, before the
> >>>>> notification
> >>>>> >>> of
> >>>>> >>> >>>>> downstream store being successful reaches the receiver that
> >>>>> updates
> >>>>> >>> >>>>> the offsets, the node fails. The store was successful, but
> >>>>> >>> >>>>> duplicates came in either way. This is something worth
> >>>>> discussing
> >>>>> >>> by
> >>>>> >>> >>>>> itself - but without uuids etc this might not really be
> >>>>> solved even
> >>>>> >>> >>> when you think it is.
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in
> all
> >>>>> of
> >>>>> >>> the
> >>>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and
> once-only
> >>>>> >>> >>>>> delivery,
> >>>>> >>> >>>> but
> >>>>> >>> >>>>> I doubt the latter is really possible to guarantee -
> though I
> >>>>> >>> really
> >>>>> >>> >>>> would
> >>>>> >>> >>>>> love to have that!
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> Thanks,
> >>>>> >>> >>>>> Hari
> >>>>> >>> >>>>>
> >>>>> >>> >>>>>
> >>>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>>>> >>> >>>>> <co...@koeninger.org>
> >>>>> >>> >>>>> wrote:
> >>>>> >>> >>>>>
> >>>>> >>> >>>>>> Thanks for the replies.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.
> >>>>> If you
> >>>>> >>> >>>>>> actually want exactly-once semantics, you need control of
> >>>>> kafka
> >>>>> >>> >>>>>> offsets
> >>>>> >>> >>>> as
> >>>>> >>> >>>>>> well, including the ability to not use zookeeper as the
> >>>>> system of
> >>>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system
> that
> >>>>> has
> >>>>> >>> >>>>>> strong
> >>>>> >>> >>>> ordering
> >>>>> >>> >>>>>> guarantees (within a partition) and does not mandate the
> use
> >>>>> of
> >>>>> >>> >>>> zookeeper
> >>>>> >>> >>>>>> to store offsets.  I think there should be a spark api
> that
> >>>>> acts
> >>>>> >>> as
> >>>>> >>> >>>>>> a
> >>>>> >>> >>>> very
> >>>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
> >>>>> >>> >>>>>> downstream
> >>>>> >>> >>>> store.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> Take a look at the links I posted - if there's already
> been 2
> >>>>> >>> >>>> independent
> >>>>> >>> >>>>>> implementations of the idea, chances are it's something
> >>>>> people
> >>>>> >>> need.
> >>>>> >>> >>>>>>
> >>>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> Hi Cody,
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can
> implement
> >>>>> >>> >>>>>>> something pretty simple and lightweight for that one.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation -
> >>>>> this is
> >>>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it
> is
> >>>>> a
> >>>>> >>> good
> >>>>> >>> >>>> idea to
> >>>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>>>> >>> >>>> optimization. For
> >>>>> >>> >>>>>>> that reason, we must be careful in implementation. There
> >>>>> are a
> >>>>> >>> >>>>>>> couple
> >>>>> >>> >>>> of
> >>>>> >>> >>>>>>> issues that we need to ensure works properly -
> specifically
> >>>>> >>> >>> ordering.
> >>>>> >>> >>>> To
> >>>>> >>> >>>>>>> ensure we pull messages from different topics and
> >>>>> partitions in
> >>>>> >>> >>>>>>> the
> >>>>> >>> >>>> same
> >>>>> >>> >>>>>>> order after failure, we’d still have to persist the
> >>>>> metadata to
> >>>>> >>> >>>>>>> HDFS
> >>>>> >>> >>>> (or
> >>>>> >>> >>>>>>> some other system) - this metadata must contain the
> order of
> >>>>> >>> >>>>>>> messages consumed, so we know how to re-read the
> messages.
> >>>>> I am
> >>>>> >>> >>>>>>> planning to
> >>>>> >>> >>>> explore
> >>>>> >>> >>>>>>> this once I have some time (probably in Jan). In
> addition,
> >>>>> we
> >>>>> >>> must
> >>>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
> >>>>> file a
> >>>>> >>> >>>>>>> placeholder jira for this one.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while
> >>>>> back -
> >>>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
> >>>>> that
> >>>>> >>> this
> >>>>> >>> >>>>>>> will get pulled in soon, as this is something I know
> people
> >>>>> want.
> >>>>> >>> >>>>>>> I am open
> >>>>> >>> >>>> to
> >>>>> >>> >>>>>>> feedback on that - anything that I can do to make it
> better.
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> Thanks,
> >>>>> >>> >>>>>>> Hari
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>>>> >>> >>>>>>> <pw...@gmail.com>
> >>>>> >>> >>>>>>> wrote:
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>>>> Hey Cody,
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming
> >>>>> is TD -
> >>>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit.
> >>>>> To the
> >>>>> >>> >>>>>>>> high level point of whether Kafka is important - it
> >>>>> definitely
> >>>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
> >>>>> for
> >>>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
> >>>>> library.
> >>>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
> >>>>> unstable
> >>>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
> >>>>> stabilize
> >>>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't
> be
> >>>>> able
> >>>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level
> >>>>> impression
> >>>>> >>> only
> >>>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so
> it's
> >>>>> worth
> >>>>> >>> >>> revisiting given the developments in Kafka.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> Please do bring things up like this on the dev list if
> >>>>> there are
> >>>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> - Patrick
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>>>> >>> >>>>>>>> <co...@koeninger.org>
> >>>>> >>> >>>>>>>> wrote:
> >>>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people
> to
> >>>>> get
> >>>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible
> >>>>> for our
> >>>>> >>> >>>>>>>> production
> >>>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint,
> because
> >>>>> I've
> >>>>> >>> >>>>>>>>> seen several different patches and libraries to fix the
> >>>>> same
> >>>>> >>> >>>>>>>>> things we've
> >>>>> >>> >>>>>>>> been
> >>>>> >>> >>>>>>>>> running into.
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Regarding flexibility
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an
> equivalent
> >>>>> of
> >>>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
> >>>>> >>> >>>>>>>>> ourselves, then found
> >>>>> >>> >>>>>>>> that
> >>>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't
> be
> >>>>> able
> >>>>> >>> >>>>>>>>> to
> >>>>> >>> >>>> get
> >>>>> >>> >>>>>>>> our
> >>>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a
> >>>>> whole
> >>>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
> >>>>> arbitrary
> >>>>> >>> >>>>>>>>> delay of
> >>>>> >>> >>>>>>>> messages, etc).
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>>>> >>> >>>>>>>>> WriteAheadLog
> >>>>> >>> >>>> as
> >>>>> >>> >>>>>>>> a
> >>>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but
> >>>>> Kafka
> >>>>> >>> >>>>>>>>> already
> >>>>> >>> >>>>>>>> is a
> >>>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
> >>>>> treats
> >>>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
> >>>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>>>> >>> >>>>>>>>> write-ahead log in hdfs, I
> >>>>> >>> >>>> need
> >>>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
> >>>>> >>> >>>>>>>>> failures
> >>>>> >>> >>>> that
> >>>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
> >>>>> >>> instance).
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> I've got an implementation i've been using
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>>
> >>>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Tresata has something similar at
> >>>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm
> >>>>> code.
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
> >>>>> rather
> >>>>> >>> >>>>>>>>> than
> >>>>> >>> >>>>>>>> patches
> >>>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
> >>>>> >>> >>>> implementation
> >>>>> >>> >>>>>>>> are
> >>>>> >>> >>>>>>>>> private[spark].
> >>>>> >>> >>>>>>>>>
> >>>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to
> >>>>> get.
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>>
> >>>>> >>> -----------------------------------------------------------------
> >>>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
> >>>>> dev-unsubscribe@spark.apache.org
> >>>>> >>> For
> >>>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>>
> >>>>> >>> >>>>>>>
> >>>>> >>> >>>>>
> >>>>> >>> >>>>
> >>>>> >>> >>>
> >>>>> >>> >>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>
> >>>>> >
> >>>>>
> >>>>
> >>>
> >>
>

Re: Which committers care about Kafka?

Posted by Tathagata Das <ta...@gmail.com>.
Hey all,

Some wrap up thoughts on this thread.

Let me first reiterate what Patrick said, that Kafka is super super
important as it forms the largest fraction of Spark Streaming user
base. So we really want to improve the Kafka + Spark Streaming
integration. To this end, some of the things that needs to be
considered can be broadly classified into the following to sort
facilitate the discussion.

1. Data rate control
2. Receiver failure semantics - partially achieving this gives
at-least once, completely achieving this gives exactly-once
3. Driver failure semantics - partially achieving this gives at-least
once, completely achieving this gives exactly-once

Here is a run down of what is achieved by different implementations
(based on what I think).

1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
handle 1 partially (some duplicate data), and could NOT handle 2 (all
previously received data lost).

2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
can handle 3, can almost completely handle 1 and 2 (except few corner
cases which prevents it from completely guaranteeing exactly-once).

3. I believe Dibyendu's solution (correct me if i am wrong) can handle
1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
completely solved by extending the solution further.

4. Cody's solution (again, correct me if I am wrong) does not use
receivers at all (so eliminates 2). It can handle 3 completely for
simple operations like map and filter, but not sure if it works
completely for stateful ops like windows and updateStateByKey. Also it
does not handle 1.

The real challenge for Kafka is in achieving 3 completely for stateful
operations while also handling 1.  (i.e., use receivers, but still get
driver failure guarantees). Solving this will give us our holy grail
solution, and this is what I want to achieve.

On that note, Cody submitted a PR on his style of achieving
exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
am reviewing it. Please follow the PR if you are interested.

TD

On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger <co...@koeninger.org> wrote:
> The conversation was mostly getting TD up to speed on this thread since he
> had just gotten back from his trip and hadn't seen it.
>
> The jira has a summary of the requirements we discussed, I'm sure TD or
> Patrick can add to the ticket if I missed something.
> On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hs...@cloudera.com>
> wrote:
>
>> In general such discussions happen or is posted on the dev lists. Could
>> you please post a summary? Thanks.
>>
>> Thanks,
>> Hari
>>
>>
>> On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>>  After a long talk with Patrick and TD (thanks guys), I opened the
>>> following jira
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4964
>>>
>>> Sample PR has an impementation for the batch and the dstream case, and a
>>> link to a project with example usage.
>>>
>>> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> yup, we at tresata do the idempotent store the same way. very simple
>>>> approach.
>>>>
>>>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>>
>>>>> That KafkaRDD code is dead simple.
>>>>>
>>>>> Given a user specified map
>>>>>
>>>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>>>> ...
>>>>> turn each one of those entries into a partition of an rdd, using the
>>>>> simple
>>>>> consumer.
>>>>> That's it.  No recovery logic, no state, nothing - for any failures,
>>>>> bail
>>>>> on the rdd and let it retry.
>>>>> Spark stays out of the business of being a distributed database.
>>>>>
>>>>> The client code does any transformation it wants, then stores the data
>>>>> and
>>>>> offsets.  There are two ways of doing this, either based on idempotence
>>>>> or
>>>>> a transactional data store.
>>>>>
>>>>> For idempotent stores:
>>>>>
>>>>> 1.manipulate data
>>>>> 2.save data to store
>>>>> 3.save ending offsets to the same store
>>>>>
>>>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>>>> again at the same beginning offsets, do the same calculations in the
>>>>> same
>>>>> order, overwrite the same data, all is good.
>>>>>
>>>>>
>>>>> For transactional stores:
>>>>>
>>>>> 1. manipulate data
>>>>> 2. begin transaction
>>>>> 3. save data to the store
>>>>> 4. save offsets
>>>>> 5. commit transaction
>>>>>
>>>>> If you fail before 5, the transaction rolls back.  To make this less
>>>>> heavyweight, you can write the data outside the transaction and then
>>>>> update
>>>>> a pointer to the current data inside the transaction.
>>>>>
>>>>>
>>>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>>>> fact, the current streaming api actively impedes my ability to do the
>>>>> above.  I'm just suggesting providing an api that doesn't get in the
>>>>> way of
>>>>> exactly-once.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>>>> hshreedharan@cloudera.com
>>>>> > wrote:
>>>>>
>>>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>>>> > quite a bit of very Kafka-specific code, that would take more time to
>>>>> read
>>>>> > than I can currently afford? If you can explain your algorithm a bit,
>>>>> it
>>>>> > might help.
>>>>> >
>>>>> > Thanks,
>>>>> > Hari
>>>>> >
>>>>> >
>>>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
>>>>> > wrote:
>>>>> >
>>>>> >>
>>>>> >> The problems you guys are discussing come from trying to store state
>>>>> in
>>>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>>>> >>
>>>>> >> Just map kafka partitions directly to rdds, llet user code specify
>>>>> the
>>>>> >> range of offsets explicitly, and let them be in charge of committing
>>>>> >> offsets.
>>>>> >>
>>>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>>>> >> production with the code I linked to, and tresata apparently has
>>>>> been as
>>>>> >> well.  Again, for everyone saying this is impossible, have you read
>>>>> either
>>>>> >> of those implementations and looked at the approach?
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>>>> >> Sean.McNamara@webtrends.com> wrote:
>>>>> >>
>>>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>>>> >>> once spark streaming semantics can easily be solved using
>>>>> updateStateByKey.
>>>>> >>> Make the key going into updateStateByKey be a hash of the event, or
>>>>> pluck
>>>>> >>> off some uuid from the message.  The updateFunc would only emit the
>>>>> message
>>>>> >>> if the key did not exist, and the user has complete control over
>>>>> the window
>>>>> >>> of time / state lifecycle for detecting duplicates.  It also makes
>>>>> it
>>>>> >>> really easy to detect and take action (alert?) when you DO see a
>>>>> duplicate,
>>>>> >>> or make memory tradeoffs within an error bound using a sketch
>>>>> algorithm.
>>>>> >>> The kafka simple consumer is insanely complex, if possible I think
>>>>> it would
>>>>> >>> be better (and vastly more flexible) to get reliability using the
>>>>> >>> primitives that spark so elegantly provides.
>>>>> >>>
>>>>> >>> Cheers,
>>>>> >>>
>>>>> >>> Sean
>>>>> >>>
>>>>> >>>
>>>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>>>> >>> hshreedharan@cloudera.com> wrote:
>>>>> >>> >
>>>>> >>> > Hi Dibyendu,
>>>>> >>> >
>>>>> >>> > Thanks for the details on the implementation. But I still do not
>>>>> >>> believe
>>>>> >>> > that it is no duplicates - what they achieve is that the same
>>>>> batch is
>>>>> >>> > processed exactly the same way every time (but see it may be
>>>>> processed
>>>>> >>> more
>>>>> >>> > than once) - so it depends on the operation being idempotent. I
>>>>> believe
>>>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>>>>> >>> > processed multiple times in failure scenarios (for example, the
>>>>> >>> transaction
>>>>> >>> > is processed but before ZK is updated the machine fails, causing a
>>>>> >>> "new"
>>>>> >>> > node to process it again).
>>>>> >>> >
>>>>> >>> > I don't think it is impossible to do this in Spark Streaming as
>>>>> well
>>>>> >>> and
>>>>> >>> > I'd be really interested in working on it at some point in the
>>>>> near
>>>>> >>> future.
>>>>> >>> >
>>>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
>>>>> >>> >
>>>>> >>> >> Hi,
>>>>> >>> >>
>>>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>>>>> Storm
>>>>> >>> >> Trident has done the exact-once guarantee by processing the
>>>>> tuple in a
>>>>> >>> >> batch  and assigning same transaction-id for a given batch . The
>>>>> >>> replay for
>>>>> >>> >> a given batch with a transaction-id will have exact same set of
>>>>> >>> tuples and
>>>>> >>> >> replay of batches happen in exact same order before the failure.
>>>>> >>> >>
>>>>> >>> >> Having this paradigm, if downstream system process data for a
>>>>> given
>>>>> >>> batch
>>>>> >>> >> for having a given transaction-id , and if during failure if same
>>>>> >>> batch is
>>>>> >>> >> again emitted , you can check if same transaction-id is already
>>>>> >>> processed
>>>>> >>> >> or not and hence can guarantee exact once semantics.
>>>>> >>> >>
>>>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>>>> >>> consumer
>>>>> >>> >> API to process the offsets. This low level Kafka Consumer (
>>>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has
>>>>> implemented the
>>>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>>>> >>> Kafka
>>>>> >>> >> related logic has been taken from Storm-Kafka spout and which
>>>>> manages
>>>>> >>> all
>>>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>>>> >>> managements.
>>>>> >>> >>
>>>>> >>> >> Presently this Consumer maintains that during Receiver failure,
>>>>> it
>>>>> >>> will
>>>>> >>> >> re-emit the exact same Block with same set of messages . Every
>>>>> >>> message have
>>>>> >>> >> the details of its partition, offset and topic related details
>>>>> which
>>>>> >>> can
>>>>> >>> >> tackle the SPARK-3146.
>>>>> >>> >>
>>>>> >>> >> As this Low Level consumer has complete control over the Kafka
>>>>> >>> Offsets ,
>>>>> >>> >> we can implement Trident like feature on top of it like having
>>>>> >>> implement a
>>>>> >>> >> transaction-id for a given block , and re-emit the same block
>>>>> with
>>>>> >>> same set
>>>>> >>> >> of message during Driver failure.
>>>>> >>> >>
>>>>> >>> >> Regards,
>>>>> >>> >> Dibyendu
>>>>> >>> >>
>>>>> >>> >>
>>>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>>>> saisai.shao@intel.com>
>>>>> >>> >> wrote:
>>>>> >>> >>>
>>>>> >>> >>> Hi all,
>>>>> >>> >>>
>>>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard
>>>>> to
>>>>> >>> >>> guarantee, especially in the failure situation. From my
>>>>> >>> understanding even
>>>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>>>> >>> guarantee the
>>>>> >>> >>> exact once semantics once failed, first is the ordering of data
>>>>> >>> replaying
>>>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>>>>> >>> partitions
>>>>> >>> >>> are injected in; second is the design complexity of achieving
>>>>> this,
>>>>> >>> you can
>>>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the
>>>>> very
>>>>> >>> details
>>>>> >>> >>> of Kafka metadata management system to achieve this, not to say
>>>>> >>> rebalance
>>>>> >>> >>> and fault-tolerance.
>>>>> >>> >>>
>>>>> >>> >>> Thanks
>>>>> >>> >>> Jerry
>>>>> >>> >>>
>>>>> >>> >>> -----Original Message-----
>>>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:
>>>>> langel.groups@gmail.com]
>>>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>>>> >>> >>> To: Cody Koeninger
>>>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>>>> >>> >>>
>>>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>>>>> only
>>>>> >>> once
>>>>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>>>>> >>> implying
>>>>> >>> >>> this is easy to achieve.
>>>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
>>>>> wrote:
>>>>> >>> >>>
>>>>> >>> >>>> If the downstream store for the output data is idempotent or
>>>>> >>> >>>> transactional, and that downstream store also is the system of
>>>>> >>> record
>>>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
>>>>> Commit
>>>>> >>> >>>> offsets with / after the data is stored.  On any failure,
>>>>> restart
>>>>> >>> from
>>>>> >>> >>> the last committed offsets.
>>>>> >>> >>>>
>>>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>>>>> rather
>>>>> >>> >>>> than near-realtime-analytics use cases.
>>>>> >>> >>>>
>>>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>>>> >>> >>>> hshreedharan@cloudera.com
>>>>> >>> >>>>> wrote:
>>>>> >>> >>>>>
>>>>> >>> >>>>> I get what you are saying. But getting exactly once right is
>>>>> an
>>>>> >>> >>>>> extremely hard problem - especially in presence of failure.
>>>>> The
>>>>> >>> >>>>> issue is failures
>>>>> >>> >>>> can
>>>>> >>> >>>>> happen in a bunch of places. For example, before the
>>>>> notification
>>>>> >>> of
>>>>> >>> >>>>> downstream store being successful reaches the receiver that
>>>>> updates
>>>>> >>> >>>>> the offsets, the node fails. The store was successful, but
>>>>> >>> >>>>> duplicates came in either way. This is something worth
>>>>> discussing
>>>>> >>> by
>>>>> >>> >>>>> itself - but without uuids etc this might not really be
>>>>> solved even
>>>>> >>> >>> when you think it is.
>>>>> >>> >>>>>
>>>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all
>>>>> of
>>>>> >>> the
>>>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>>>> >>> >>>>> delivery,
>>>>> >>> >>>> but
>>>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>>>>> >>> really
>>>>> >>> >>>> would
>>>>> >>> >>>>> love to have that!
>>>>> >>> >>>>>
>>>>> >>> >>>>> Thanks,
>>>>> >>> >>>>> Hari
>>>>> >>> >>>>>
>>>>> >>> >>>>>
>>>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>>>> >>> >>>>> <co...@koeninger.org>
>>>>> >>> >>>>> wrote:
>>>>> >>> >>>>>
>>>>> >>> >>>>>> Thanks for the replies.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.
>>>>> If you
>>>>> >>> >>>>>> actually want exactly-once semantics, you need control of
>>>>> kafka
>>>>> >>> >>>>>> offsets
>>>>> >>> >>>> as
>>>>> >>> >>>>>> well, including the ability to not use zookeeper as the
>>>>> system of
>>>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>>>>> has
>>>>> >>> >>>>>> strong
>>>>> >>> >>>> ordering
>>>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use
>>>>> of
>>>>> >>> >>>> zookeeper
>>>>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>>>>> acts
>>>>> >>> as
>>>>> >>> >>>>>> a
>>>>> >>> >>>> very
>>>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>>>>> >>> >>>>>> downstream
>>>>> >>> >>>> store.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>>>>> >>> >>>> independent
>>>>> >>> >>>>>> implementations of the idea, chances are it's something
>>>>> people
>>>>> >>> need.
>>>>> >>> >>>>>>
>>>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> Hi Cody,
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>>>> >>> >>>>>>> something pretty simple and lightweight for that one.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation -
>>>>> this is
>>>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is
>>>>> a
>>>>> >>> good
>>>>> >>> >>>> idea to
>>>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>>>> >>> >>>> optimization. For
>>>>> >>> >>>>>>> that reason, we must be careful in implementation. There
>>>>> are a
>>>>> >>> >>>>>>> couple
>>>>> >>> >>>> of
>>>>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>>>>> >>> >>> ordering.
>>>>> >>> >>>> To
>>>>> >>> >>>>>>> ensure we pull messages from different topics and
>>>>> partitions in
>>>>> >>> >>>>>>> the
>>>>> >>> >>>> same
>>>>> >>> >>>>>>> order after failure, we’d still have to persist the
>>>>> metadata to
>>>>> >>> >>>>>>> HDFS
>>>>> >>> >>>> (or
>>>>> >>> >>>>>>> some other system) - this metadata must contain the order of
>>>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages.
>>>>> I am
>>>>> >>> >>>>>>> planning to
>>>>> >>> >>>> explore
>>>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition,
>>>>> we
>>>>> >>> must
>>>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>>>>> file a
>>>>> >>> >>>>>>> placeholder jira for this one.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while
>>>>> back -
>>>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
>>>>> that
>>>>> >>> this
>>>>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>>>>> want.
>>>>> >>> >>>>>>> I am open
>>>>> >>> >>>> to
>>>>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> Thanks,
>>>>> >>> >>>>>>> Hari
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>>>> >>> >>>>>>> <pw...@gmail.com>
>>>>> >>> >>>>>>> wrote:
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>>>> Hey Cody,
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming
>>>>> is TD -
>>>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit.
>>>>> To the
>>>>> >>> >>>>>>>> high level point of whether Kafka is important - it
>>>>> definitely
>>>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
>>>>> for
>>>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>>>>> library.
>>>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>>>>> unstable
>>>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>>>>> stabilize
>>>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>>>>> able
>>>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level
>>>>> impression
>>>>> >>> only
>>>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>>>>> worth
>>>>> >>> >>> revisiting given the developments in Kafka.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> Please do bring things up like this on the dev list if
>>>>> there are
>>>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> - Patrick
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>>>> >>> >>>>>>>> <co...@koeninger.org>
>>>>> >>> >>>>>>>> wrote:
>>>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to
>>>>> get
>>>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible
>>>>> for our
>>>>> >>> >>>>>>>> production
>>>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>>>>> I've
>>>>> >>> >>>>>>>>> seen several different patches and libraries to fix the
>>>>> same
>>>>> >>> >>>>>>>>> things we've
>>>>> >>> >>>>>>>> been
>>>>> >>> >>>>>>>>> running into.
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Regarding flexibility
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent
>>>>> of
>>>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>>>> >>> >>>>>>>>> ourselves, then found
>>>>> >>> >>>>>>>> that
>>>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>>>>> able
>>>>> >>> >>>>>>>>> to
>>>>> >>> >>>> get
>>>>> >>> >>>>>>>> our
>>>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a
>>>>> whole
>>>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
>>>>> arbitrary
>>>>> >>> >>>>>>>>> delay of
>>>>> >>> >>>>>>>> messages, etc).
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>>>> >>> >>>>>>>>> WriteAheadLog
>>>>> >>> >>>> as
>>>>> >>> >>>>>>>> a
>>>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but
>>>>> Kafka
>>>>> >>> >>>>>>>>> already
>>>>> >>> >>>>>>>> is a
>>>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>>>>> treats
>>>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>>>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>>>> >>> >>>> need
>>>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>>>> >>> >>>>>>>>> failures
>>>>> >>> >>>> that
>>>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>>>> >>> instance).
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> I've got an implementation i've been using
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>>
>>>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Tresata has something similar at
>>>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm
>>>>> code.
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>>>>> rather
>>>>> >>> >>>>>>>>> than
>>>>> >>> >>>>>>>> patches
>>>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>>>>> >>> >>>> implementation
>>>>> >>> >>>>>>>> are
>>>>> >>> >>>>>>>>> private[spark].
>>>>> >>> >>>>>>>>>
>>>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to
>>>>> get.
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>>
>>>>> >>> -----------------------------------------------------------------
>>>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>>>> dev-unsubscribe@spark.apache.org
>>>>> >>> For
>>>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>>
>>>>> >>> >>>>>>>
>>>>> >>> >>>>>
>>>>> >>> >>>>
>>>>> >>> >>>
>>>>> >>> >>
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
The conversation was mostly getting TD up to speed on this thread since he
had just gotten back from his trip and hadn't seen it.

The jira has a summary of the requirements we discussed, I'm sure TD or
Patrick can add to the ticket if I missed something.
On Dec 25, 2014 1:54 AM, "Hari Shreedharan" <hs...@cloudera.com>
wrote:

> In general such discussions happen or is posted on the dev lists. Could
> you please post a summary? Thanks.
>
> Thanks,
> Hari
>
>
> On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>>  After a long talk with Patrick and TD (thanks guys), I opened the
>> following jira
>>
>> https://issues.apache.org/jira/browse/SPARK-4964
>>
>> Sample PR has an impementation for the batch and the dstream case, and a
>> link to a project with example usage.
>>
>> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> yup, we at tresata do the idempotent store the same way. very simple
>>> approach.
>>>
>>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>>
>>>> That KafkaRDD code is dead simple.
>>>>
>>>> Given a user specified map
>>>>
>>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>>> ...
>>>> turn each one of those entries into a partition of an rdd, using the
>>>> simple
>>>> consumer.
>>>> That's it.  No recovery logic, no state, nothing - for any failures,
>>>> bail
>>>> on the rdd and let it retry.
>>>> Spark stays out of the business of being a distributed database.
>>>>
>>>> The client code does any transformation it wants, then stores the data
>>>> and
>>>> offsets.  There are two ways of doing this, either based on idempotence
>>>> or
>>>> a transactional data store.
>>>>
>>>> For idempotent stores:
>>>>
>>>> 1.manipulate data
>>>> 2.save data to store
>>>> 3.save ending offsets to the same store
>>>>
>>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>>> again at the same beginning offsets, do the same calculations in the
>>>> same
>>>> order, overwrite the same data, all is good.
>>>>
>>>>
>>>> For transactional stores:
>>>>
>>>> 1. manipulate data
>>>> 2. begin transaction
>>>> 3. save data to the store
>>>> 4. save offsets
>>>> 5. commit transaction
>>>>
>>>> If you fail before 5, the transaction rolls back.  To make this less
>>>> heavyweight, you can write the data outside the transaction and then
>>>> update
>>>> a pointer to the current data inside the transaction.
>>>>
>>>>
>>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>>> fact, the current streaming api actively impedes my ability to do the
>>>> above.  I'm just suggesting providing an api that doesn't get in the
>>>> way of
>>>> exactly-once.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>>> hshreedharan@cloudera.com
>>>> > wrote:
>>>>
>>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>>> > quite a bit of very Kafka-specific code, that would take more time to
>>>> read
>>>> > than I can currently afford? If you can explain your algorithm a bit,
>>>> it
>>>> > might help.
>>>> >
>>>> > Thanks,
>>>> > Hari
>>>> >
>>>> >
>>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
>>>> > wrote:
>>>> >
>>>> >>
>>>> >> The problems you guys are discussing come from trying to store state
>>>> in
>>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>>> >>
>>>> >> Just map kafka partitions directly to rdds, llet user code specify
>>>> the
>>>> >> range of offsets explicitly, and let them be in charge of committing
>>>> >> offsets.
>>>> >>
>>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>>> >> production with the code I linked to, and tresata apparently has
>>>> been as
>>>> >> well.  Again, for everyone saying this is impossible, have you read
>>>> either
>>>> >> of those implementations and looked at the approach?
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>>> >> Sean.McNamara@webtrends.com> wrote:
>>>> >>
>>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>>> >>> once spark streaming semantics can easily be solved using
>>>> updateStateByKey.
>>>> >>> Make the key going into updateStateByKey be a hash of the event, or
>>>> pluck
>>>> >>> off some uuid from the message.  The updateFunc would only emit the
>>>> message
>>>> >>> if the key did not exist, and the user has complete control over
>>>> the window
>>>> >>> of time / state lifecycle for detecting duplicates.  It also makes
>>>> it
>>>> >>> really easy to detect and take action (alert?) when you DO see a
>>>> duplicate,
>>>> >>> or make memory tradeoffs within an error bound using a sketch
>>>> algorithm.
>>>> >>> The kafka simple consumer is insanely complex, if possible I think
>>>> it would
>>>> >>> be better (and vastly more flexible) to get reliability using the
>>>> >>> primitives that spark so elegantly provides.
>>>> >>>
>>>> >>> Cheers,
>>>> >>>
>>>> >>> Sean
>>>> >>>
>>>> >>>
>>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>>> >>> hshreedharan@cloudera.com> wrote:
>>>> >>> >
>>>> >>> > Hi Dibyendu,
>>>> >>> >
>>>> >>> > Thanks for the details on the implementation. But I still do not
>>>> >>> believe
>>>> >>> > that it is no duplicates - what they achieve is that the same
>>>> batch is
>>>> >>> > processed exactly the same way every time (but see it may be
>>>> processed
>>>> >>> more
>>>> >>> > than once) - so it depends on the operation being idempotent. I
>>>> believe
>>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>>>> >>> > processed multiple times in failure scenarios (for example, the
>>>> >>> transaction
>>>> >>> > is processed but before ZK is updated the machine fails, causing a
>>>> >>> "new"
>>>> >>> > node to process it again).
>>>> >>> >
>>>> >>> > I don't think it is impossible to do this in Spark Streaming as
>>>> well
>>>> >>> and
>>>> >>> > I'd be really interested in working on it at some point in the
>>>> near
>>>> >>> future.
>>>> >>> >
>>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
>>>> >>> >
>>>> >>> >> Hi,
>>>> >>> >>
>>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>>>> Storm
>>>> >>> >> Trident has done the exact-once guarantee by processing the
>>>> tuple in a
>>>> >>> >> batch  and assigning same transaction-id for a given batch . The
>>>> >>> replay for
>>>> >>> >> a given batch with a transaction-id will have exact same set of
>>>> >>> tuples and
>>>> >>> >> replay of batches happen in exact same order before the failure.
>>>> >>> >>
>>>> >>> >> Having this paradigm, if downstream system process data for a
>>>> given
>>>> >>> batch
>>>> >>> >> for having a given transaction-id , and if during failure if same
>>>> >>> batch is
>>>> >>> >> again emitted , you can check if same transaction-id is already
>>>> >>> processed
>>>> >>> >> or not and hence can guarantee exact once semantics.
>>>> >>> >>
>>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>>> >>> consumer
>>>> >>> >> API to process the offsets. This low level Kafka Consumer (
>>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has
>>>> implemented the
>>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>>> >>> Kafka
>>>> >>> >> related logic has been taken from Storm-Kafka spout and which
>>>> manages
>>>> >>> all
>>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>>> >>> managements.
>>>> >>> >>
>>>> >>> >> Presently this Consumer maintains that during Receiver failure,
>>>> it
>>>> >>> will
>>>> >>> >> re-emit the exact same Block with same set of messages . Every
>>>> >>> message have
>>>> >>> >> the details of its partition, offset and topic related details
>>>> which
>>>> >>> can
>>>> >>> >> tackle the SPARK-3146.
>>>> >>> >>
>>>> >>> >> As this Low Level consumer has complete control over the Kafka
>>>> >>> Offsets ,
>>>> >>> >> we can implement Trident like feature on top of it like having
>>>> >>> implement a
>>>> >>> >> transaction-id for a given block , and re-emit the same block
>>>> with
>>>> >>> same set
>>>> >>> >> of message during Driver failure.
>>>> >>> >>
>>>> >>> >> Regards,
>>>> >>> >> Dibyendu
>>>> >>> >>
>>>> >>> >>
>>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>>> saisai.shao@intel.com>
>>>> >>> >> wrote:
>>>> >>> >>>
>>>> >>> >>> Hi all,
>>>> >>> >>>
>>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard
>>>> to
>>>> >>> >>> guarantee, especially in the failure situation. From my
>>>> >>> understanding even
>>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>>> >>> guarantee the
>>>> >>> >>> exact once semantics once failed, first is the ordering of data
>>>> >>> replaying
>>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>>>> >>> partitions
>>>> >>> >>> are injected in; second is the design complexity of achieving
>>>> this,
>>>> >>> you can
>>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the
>>>> very
>>>> >>> details
>>>> >>> >>> of Kafka metadata management system to achieve this, not to say
>>>> >>> rebalance
>>>> >>> >>> and fault-tolerance.
>>>> >>> >>>
>>>> >>> >>> Thanks
>>>> >>> >>> Jerry
>>>> >>> >>>
>>>> >>> >>> -----Original Message-----
>>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:
>>>> langel.groups@gmail.com]
>>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>>> >>> >>> To: Cody Koeninger
>>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>>> >>> >>>
>>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>>>> only
>>>> >>> once
>>>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>>>> >>> implying
>>>> >>> >>> this is easy to achieve.
>>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
>>>> wrote:
>>>> >>> >>>
>>>> >>> >>>> If the downstream store for the output data is idempotent or
>>>> >>> >>>> transactional, and that downstream store also is the system of
>>>> >>> record
>>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
>>>> Commit
>>>> >>> >>>> offsets with / after the data is stored.  On any failure,
>>>> restart
>>>> >>> from
>>>> >>> >>> the last committed offsets.
>>>> >>> >>>>
>>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>>>> rather
>>>> >>> >>>> than near-realtime-analytics use cases.
>>>> >>> >>>>
>>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>>> >>> >>>> hshreedharan@cloudera.com
>>>> >>> >>>>> wrote:
>>>> >>> >>>>>
>>>> >>> >>>>> I get what you are saying. But getting exactly once right is
>>>> an
>>>> >>> >>>>> extremely hard problem - especially in presence of failure.
>>>> The
>>>> >>> >>>>> issue is failures
>>>> >>> >>>> can
>>>> >>> >>>>> happen in a bunch of places. For example, before the
>>>> notification
>>>> >>> of
>>>> >>> >>>>> downstream store being successful reaches the receiver that
>>>> updates
>>>> >>> >>>>> the offsets, the node fails. The store was successful, but
>>>> >>> >>>>> duplicates came in either way. This is something worth
>>>> discussing
>>>> >>> by
>>>> >>> >>>>> itself - but without uuids etc this might not really be
>>>> solved even
>>>> >>> >>> when you think it is.
>>>> >>> >>>>>
>>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all
>>>> of
>>>> >>> the
>>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>>> >>> >>>>> delivery,
>>>> >>> >>>> but
>>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>>>> >>> really
>>>> >>> >>>> would
>>>> >>> >>>>> love to have that!
>>>> >>> >>>>>
>>>> >>> >>>>> Thanks,
>>>> >>> >>>>> Hari
>>>> >>> >>>>>
>>>> >>> >>>>>
>>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>>> >>> >>>>> <co...@koeninger.org>
>>>> >>> >>>>> wrote:
>>>> >>> >>>>>
>>>> >>> >>>>>> Thanks for the replies.
>>>> >>> >>>>>>
>>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.
>>>> If you
>>>> >>> >>>>>> actually want exactly-once semantics, you need control of
>>>> kafka
>>>> >>> >>>>>> offsets
>>>> >>> >>>> as
>>>> >>> >>>>>> well, including the ability to not use zookeeper as the
>>>> system of
>>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>>>> has
>>>> >>> >>>>>> strong
>>>> >>> >>>> ordering
>>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use
>>>> of
>>>> >>> >>>> zookeeper
>>>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>>>> acts
>>>> >>> as
>>>> >>> >>>>>> a
>>>> >>> >>>> very
>>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>>>> >>> >>>>>> downstream
>>>> >>> >>>> store.
>>>> >>> >>>>>>
>>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>>>> >>> >>>> independent
>>>> >>> >>>>>> implementations of the idea, chances are it's something
>>>> people
>>>> >>> need.
>>>> >>> >>>>>>
>>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> Hi Cody,
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>>> >>> >>>>>>> something pretty simple and lightweight for that one.
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation -
>>>> this is
>>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is
>>>> a
>>>> >>> good
>>>> >>> >>>> idea to
>>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>>> >>> >>>> optimization. For
>>>> >>> >>>>>>> that reason, we must be careful in implementation. There
>>>> are a
>>>> >>> >>>>>>> couple
>>>> >>> >>>> of
>>>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>>>> >>> >>> ordering.
>>>> >>> >>>> To
>>>> >>> >>>>>>> ensure we pull messages from different topics and
>>>> partitions in
>>>> >>> >>>>>>> the
>>>> >>> >>>> same
>>>> >>> >>>>>>> order after failure, we’d still have to persist the
>>>> metadata to
>>>> >>> >>>>>>> HDFS
>>>> >>> >>>> (or
>>>> >>> >>>>>>> some other system) - this metadata must contain the order of
>>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages.
>>>> I am
>>>> >>> >>>>>>> planning to
>>>> >>> >>>> explore
>>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition,
>>>> we
>>>> >>> must
>>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>>>> file a
>>>> >>> >>>>>>> placeholder jira for this one.
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while
>>>> back -
>>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
>>>> that
>>>> >>> this
>>>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>>>> want.
>>>> >>> >>>>>>> I am open
>>>> >>> >>>> to
>>>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> Thanks,
>>>> >>> >>>>>>> Hari
>>>> >>> >>>>>>>
>>>> >>> >>>>>>>
>>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>>> >>> >>>>>>> <pw...@gmail.com>
>>>> >>> >>>>>>> wrote:
>>>> >>> >>>>>>>
>>>> >>> >>>>>>>> Hey Cody,
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming
>>>> is TD -
>>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit.
>>>> To the
>>>> >>> >>>>>>>> high level point of whether Kafka is important - it
>>>> definitely
>>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
>>>> for
>>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>>>> library.
>>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>>>> unstable
>>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>>>> stabilize
>>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>>>> able
>>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level
>>>> impression
>>>> >>> only
>>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>>>> worth
>>>> >>> >>> revisiting given the developments in Kafka.
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>> Please do bring things up like this on the dev list if
>>>> there are
>>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>> - Patrick
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>>> >>> >>>>>>>> <co...@koeninger.org>
>>>> >>> >>>>>>>> wrote:
>>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to
>>>> get
>>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible
>>>> for our
>>>> >>> >>>>>>>> production
>>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>>>> I've
>>>> >>> >>>>>>>>> seen several different patches and libraries to fix the
>>>> same
>>>> >>> >>>>>>>>> things we've
>>>> >>> >>>>>>>> been
>>>> >>> >>>>>>>>> running into.
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> Regarding flexibility
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent
>>>> of
>>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>>> >>> >>>>>>>>> ourselves, then found
>>>> >>> >>>>>>>> that
>>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>>>> able
>>>> >>> >>>>>>>>> to
>>>> >>> >>>> get
>>>> >>> >>>>>>>> our
>>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a
>>>> whole
>>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
>>>> arbitrary
>>>> >>> >>>>>>>>> delay of
>>>> >>> >>>>>>>> messages, etc).
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>>> >>> >>>>>>>>> WriteAheadLog
>>>> >>> >>>> as
>>>> >>> >>>>>>>> a
>>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but
>>>> Kafka
>>>> >>> >>>>>>>>> already
>>>> >>> >>>>>>>> is a
>>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>>>> treats
>>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>>> >>> >>>> need
>>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>>> >>> >>>>>>>>> failures
>>>> >>> >>>> that
>>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>>> >>> instance).
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> I've got an implementation i've been using
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>>
>>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> Tresata has something similar at
>>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm
>>>> code.
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>>>> rather
>>>> >>> >>>>>>>>> than
>>>> >>> >>>>>>>> patches
>>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>>>> >>> >>>> implementation
>>>> >>> >>>>>>>> are
>>>> >>> >>>>>>>>> private[spark].
>>>> >>> >>>>>>>>>
>>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to
>>>> get.
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>>
>>>> >>> -----------------------------------------------------------------
>>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>>> dev-unsubscribe@spark.apache.org
>>>> >>> For
>>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>>
>>>> >>> >>>>>>>
>>>> >>> >>>>>
>>>> >>> >>>>
>>>> >>> >>>
>>>> >>> >>
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>
>>
>

Re: Which committers care about Kafka?

Posted by Hari Shreedharan <hs...@cloudera.com>.
In general such discussions happen or is posted on the dev lists. Could you please post a summary? Thanks.



Thanks, Hari

On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <co...@koeninger.org>
wrote:

> After a long talk with Patrick and TD (thanks guys), I opened the following
> jira
> https://issues.apache.org/jira/browse/SPARK-4964
> Sample PR has an impementation for the batch and the dstream case, and a
> link to a project with example usage.
> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:
>> yup, we at tresata do the idempotent store the same way. very simple
>> approach.
>>
>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>>
>>> That KafkaRDD code is dead simple.
>>>
>>> Given a user specified map
>>>
>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>> ...
>>> turn each one of those entries into a partition of an rdd, using the
>>> simple
>>> consumer.
>>> That's it.  No recovery logic, no state, nothing - for any failures, bail
>>> on the rdd and let it retry.
>>> Spark stays out of the business of being a distributed database.
>>>
>>> The client code does any transformation it wants, then stores the data and
>>> offsets.  There are two ways of doing this, either based on idempotence or
>>> a transactional data store.
>>>
>>> For idempotent stores:
>>>
>>> 1.manipulate data
>>> 2.save data to store
>>> 3.save ending offsets to the same store
>>>
>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>> again at the same beginning offsets, do the same calculations in the same
>>> order, overwrite the same data, all is good.
>>>
>>>
>>> For transactional stores:
>>>
>>> 1. manipulate data
>>> 2. begin transaction
>>> 3. save data to the store
>>> 4. save offsets
>>> 5. commit transaction
>>>
>>> If you fail before 5, the transaction rolls back.  To make this less
>>> heavyweight, you can write the data outside the transaction and then
>>> update
>>> a pointer to the current data inside the transaction.
>>>
>>>
>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>> fact, the current streaming api actively impedes my ability to do the
>>> above.  I'm just suggesting providing an api that doesn't get in the way
>>> of
>>> exactly-once.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>> hshreedharan@cloudera.com
>>> > wrote:
>>>
>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>> > quite a bit of very Kafka-specific code, that would take more time to
>>> read
>>> > than I can currently afford? If you can explain your algorithm a bit, it
>>> > might help.
>>> >
>>> > Thanks,
>>> > Hari
>>> >
>>> >
>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
>>> > wrote:
>>> >
>>> >>
>>> >> The problems you guys are discussing come from trying to store state in
>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>> >>
>>> >> Just map kafka partitions directly to rdds, llet user code specify the
>>> >> range of offsets explicitly, and let them be in charge of committing
>>> >> offsets.
>>> >>
>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>> >> production with the code I linked to, and tresata apparently has been
>>> as
>>> >> well.  Again, for everyone saying this is impossible, have you read
>>> either
>>> >> of those implementations and looked at the approach?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>> >> Sean.McNamara@webtrends.com> wrote:
>>> >>
>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>> >>> once spark streaming semantics can easily be solved using
>>> updateStateByKey.
>>> >>> Make the key going into updateStateByKey be a hash of the event, or
>>> pluck
>>> >>> off some uuid from the message.  The updateFunc would only emit the
>>> message
>>> >>> if the key did not exist, and the user has complete control over the
>>> window
>>> >>> of time / state lifecycle for detecting duplicates.  It also makes it
>>> >>> really easy to detect and take action (alert?) when you DO see a
>>> duplicate,
>>> >>> or make memory tradeoffs within an error bound using a sketch
>>> algorithm.
>>> >>> The kafka simple consumer is insanely complex, if possible I think it
>>> would
>>> >>> be better (and vastly more flexible) to get reliability using the
>>> >>> primitives that spark so elegantly provides.
>>> >>>
>>> >>> Cheers,
>>> >>>
>>> >>> Sean
>>> >>>
>>> >>>
>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>> >>> hshreedharan@cloudera.com> wrote:
>>> >>> >
>>> >>> > Hi Dibyendu,
>>> >>> >
>>> >>> > Thanks for the details on the implementation. But I still do not
>>> >>> believe
>>> >>> > that it is no duplicates - what they achieve is that the same batch
>>> is
>>> >>> > processed exactly the same way every time (but see it may be
>>> processed
>>> >>> more
>>> >>> > than once) - so it depends on the operation being idempotent. I
>>> believe
>>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>>> >>> > processed multiple times in failure scenarios (for example, the
>>> >>> transaction
>>> >>> > is processed but before ZK is updated the machine fails, causing a
>>> >>> "new"
>>> >>> > node to process it again).
>>> >>> >
>>> >>> > I don't think it is impossible to do this in Spark Streaming as well
>>> >>> and
>>> >>> > I'd be really interested in working on it at some point in the near
>>> >>> future.
>>> >>> >
>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
>>> >>> >
>>> >>> >> Hi,
>>> >>> >>
>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>>> Storm
>>> >>> >> Trident has done the exact-once guarantee by processing the tuple
>>> in a
>>> >>> >> batch  and assigning same transaction-id for a given batch . The
>>> >>> replay for
>>> >>> >> a given batch with a transaction-id will have exact same set of
>>> >>> tuples and
>>> >>> >> replay of batches happen in exact same order before the failure.
>>> >>> >>
>>> >>> >> Having this paradigm, if downstream system process data for a given
>>> >>> batch
>>> >>> >> for having a given transaction-id , and if during failure if same
>>> >>> batch is
>>> >>> >> again emitted , you can check if same transaction-id is already
>>> >>> processed
>>> >>> >> or not and hence can guarantee exact once semantics.
>>> >>> >>
>>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>> >>> consumer
>>> >>> >> API to process the offsets. This low level Kafka Consumer (
>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
>>> the
>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>> >>> Kafka
>>> >>> >> related logic has been taken from Storm-Kafka spout and which
>>> manages
>>> >>> all
>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>> >>> managements.
>>> >>> >>
>>> >>> >> Presently this Consumer maintains that during Receiver failure, it
>>> >>> will
>>> >>> >> re-emit the exact same Block with same set of messages . Every
>>> >>> message have
>>> >>> >> the details of its partition, offset and topic related details
>>> which
>>> >>> can
>>> >>> >> tackle the SPARK-3146.
>>> >>> >>
>>> >>> >> As this Low Level consumer has complete control over the Kafka
>>> >>> Offsets ,
>>> >>> >> we can implement Trident like feature on top of it like having
>>> >>> implement a
>>> >>> >> transaction-id for a given block , and re-emit the same block with
>>> >>> same set
>>> >>> >> of message during Driver failure.
>>> >>> >>
>>> >>> >> Regards,
>>> >>> >> Dibyendu
>>> >>> >>
>>> >>> >>
>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>> saisai.shao@intel.com>
>>> >>> >> wrote:
>>> >>> >>>
>>> >>> >>> Hi all,
>>> >>> >>>
>>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>>> >>> >>> guarantee, especially in the failure situation. From my
>>> >>> understanding even
>>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>> >>> guarantee the
>>> >>> >>> exact once semantics once failed, first is the ordering of data
>>> >>> replaying
>>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>>> >>> partitions
>>> >>> >>> are injected in; second is the design complexity of achieving
>>> this,
>>> >>> you can
>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>>> >>> details
>>> >>> >>> of Kafka metadata management system to achieve this, not to say
>>> >>> rebalance
>>> >>> >>> and fault-tolerance.
>>> >>> >>>
>>> >>> >>> Thanks
>>> >>> >>> Jerry
>>> >>> >>>
>>> >>> >>> -----Original Message-----
>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>> >>> >>> To: Cody Koeninger
>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>> >>> >>>
>>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>>> only
>>> >>> once
>>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>>> >>> implying
>>> >>> >>> this is easy to achieve.
>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
>>> wrote:
>>> >>> >>>
>>> >>> >>>> If the downstream store for the output data is idempotent or
>>> >>> >>>> transactional, and that downstream store also is the system of
>>> >>> record
>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>>> >>> >>>> offsets with / after the data is stored.  On any failure, restart
>>> >>> from
>>> >>> >>> the last committed offsets.
>>> >>> >>>>
>>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>>> rather
>>> >>> >>>> than near-realtime-analytics use cases.
>>> >>> >>>>
>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>> >>> >>>> hshreedharan@cloudera.com
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>> I get what you are saying. But getting exactly once right is an
>>> >>> >>>>> extremely hard problem - especially in presence of failure. The
>>> >>> >>>>> issue is failures
>>> >>> >>>> can
>>> >>> >>>>> happen in a bunch of places. For example, before the
>>> notification
>>> >>> of
>>> >>> >>>>> downstream store being successful reaches the receiver that
>>> updates
>>> >>> >>>>> the offsets, the node fails. The store was successful, but
>>> >>> >>>>> duplicates came in either way. This is something worth
>>> discussing
>>> >>> by
>>> >>> >>>>> itself - but without uuids etc this might not really be solved
>>> even
>>> >>> >>> when you think it is.
>>> >>> >>>>>
>>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
>>> >>> the
>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>> >>> >>>>> delivery,
>>> >>> >>>> but
>>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>>> >>> really
>>> >>> >>>> would
>>> >>> >>>>> love to have that!
>>> >>> >>>>>
>>> >>> >>>>> Thanks,
>>> >>> >>>>> Hari
>>> >>> >>>>>
>>> >>> >>>>>
>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>> >>> >>>>> <co...@koeninger.org>
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>>> Thanks for the replies.
>>> >>> >>>>>>
>>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
>>> you
>>> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
>>> >>> >>>>>> offsets
>>> >>> >>>> as
>>> >>> >>>>>> well, including the ability to not use zookeeper as the system
>>> of
>>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>>> has
>>> >>> >>>>>> strong
>>> >>> >>>> ordering
>>> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
>>> >>> >>>> zookeeper
>>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>>> acts
>>> >>> as
>>> >>> >>>>>> a
>>> >>> >>>> very
>>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>>> >>> >>>>>> downstream
>>> >>> >>>> store.
>>> >>> >>>>>>
>>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>>> >>> >>>> independent
>>> >>> >>>>>> implementations of the idea, chances are it's something people
>>> >>> need.
>>> >>> >>>>>>
>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>> Hi Cody,
>>> >>> >>>>>>>
>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>> >>> >>>>>>> something pretty simple and lightweight for that one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this
>>> is
>>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
>>> >>> good
>>> >>> >>>> idea to
>>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>> >>> >>>> optimization. For
>>> >>> >>>>>>> that reason, we must be careful in implementation. There are a
>>> >>> >>>>>>> couple
>>> >>> >>>> of
>>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>>> >>> >>> ordering.
>>> >>> >>>> To
>>> >>> >>>>>>> ensure we pull messages from different topics and partitions
>>> in
>>> >>> >>>>>>> the
>>> >>> >>>> same
>>> >>> >>>>>>> order after failure, we’d still have to persist the metadata
>>> to
>>> >>> >>>>>>> HDFS
>>> >>> >>>> (or
>>> >>> >>>>>>> some other system) - this metadata must contain the order of
>>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I
>>> am
>>> >>> >>>>>>> planning to
>>> >>> >>>> explore
>>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
>>> >>> must
>>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>>> file a
>>> >>> >>>>>>> placeholder jira for this one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
>>> >>> this
>>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>>> want.
>>> >>> >>>>>>> I am open
>>> >>> >>>> to
>>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>>> >>> >>>>>>>
>>> >>> >>>>>>> Thanks,
>>> >>> >>>>>>> Hari
>>> >>> >>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>> >>> >>>>>>> <pw...@gmail.com>
>>> >>> >>>>>>> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>>> Hey Cody,
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
>>> TD -
>>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
>>> the
>>> >>> >>>>>>>> high level point of whether Kafka is important - it
>>> definitely
>>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>>> library.
>>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>>> unstable
>>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>>> stabilize
>>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>>> able
>>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
>>> >>> only
>>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>>> worth
>>> >>> >>> revisiting given the developments in Kafka.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Please do bring things up like this on the dev list if there
>>> are
>>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> - Patrick
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>> >>> >>>>>>>> <co...@koeninger.org>
>>> >>> >>>>>>>> wrote:
>>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
>>> our
>>> >>> >>>>>>>> production
>>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>>> I've
>>> >>> >>>>>>>>> seen several different patches and libraries to fix the same
>>> >>> >>>>>>>>> things we've
>>> >>> >>>>>>>> been
>>> >>> >>>>>>>>> running into.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding flexibility
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>> >>> >>>>>>>>> ourselves, then found
>>> >>> >>>>>>>> that
>>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>>> able
>>> >>> >>>>>>>>> to
>>> >>> >>>> get
>>> >>> >>>>>>>> our
>>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>>> >>> >>>>>>>>> delay of
>>> >>> >>>>>>>> messages, etc).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>> >>> >>>>>>>>> WriteAheadLog
>>> >>> >>>> as
>>> >>> >>>>>>>> a
>>> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>>> >>> >>>>>>>>> already
>>> >>> >>>>>>>> is a
>>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>>> treats
>>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>> >>> >>>> need
>>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>> >>> >>>>>>>>> failures
>>> >>> >>>> that
>>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>> >>> instance).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I've got an implementation i've been using
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>>
>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Tresata has something similar at
>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>>> rather
>>> >>> >>>>>>>>> than
>>> >>> >>>>>>>> patches
>>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>>> >>> >>>> implementation
>>> >>> >>>>>>>> are
>>> >>> >>>>>>>>> private[spark].
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> -----------------------------------------------------------------
>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>> dev-unsubscribe@spark.apache.org
>>> >>> For
>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>
>>> >>> >>>>
>>> >>> >>>
>>> >>> >>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
After a long talk with Patrick and TD (thanks guys), I opened the following
jira

https://issues.apache.org/jira/browse/SPARK-4964

Sample PR has an impementation for the batch and the dstream case, and a
link to a project with example usage.

On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:

> yup, we at tresata do the idempotent store the same way. very simple
> approach.
>
> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>>
>> That KafkaRDD code is dead simple.
>>
>> Given a user specified map
>>
>> (topic1, partition0) -> (startingOffset, endingOffset)
>> (topic1, partition1) -> (startingOffset, endingOffset)
>> ...
>> turn each one of those entries into a partition of an rdd, using the
>> simple
>> consumer.
>> That's it.  No recovery logic, no state, nothing - for any failures, bail
>> on the rdd and let it retry.
>> Spark stays out of the business of being a distributed database.
>>
>> The client code does any transformation it wants, then stores the data and
>> offsets.  There are two ways of doing this, either based on idempotence or
>> a transactional data store.
>>
>> For idempotent stores:
>>
>> 1.manipulate data
>> 2.save data to store
>> 3.save ending offsets to the same store
>>
>> If you fail between 2 and 3, the offsets haven't been stored, you start
>> again at the same beginning offsets, do the same calculations in the same
>> order, overwrite the same data, all is good.
>>
>>
>> For transactional stores:
>>
>> 1. manipulate data
>> 2. begin transaction
>> 3. save data to the store
>> 4. save offsets
>> 5. commit transaction
>>
>> If you fail before 5, the transaction rolls back.  To make this less
>> heavyweight, you can write the data outside the transaction and then
>> update
>> a pointer to the current data inside the transaction.
>>
>>
>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>> fact, the current streaming api actively impedes my ability to do the
>> above.  I'm just suggesting providing an api that doesn't get in the way
>> of
>> exactly-once.
>>
>>
>>
>>
>>
>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>> hshreedharan@cloudera.com
>> > wrote:
>>
>> > Can you explain your basic algorithm for the once-only-delivery? It is
>> > quite a bit of very Kafka-specific code, that would take more time to
>> read
>> > than I can currently afford? If you can explain your algorithm a bit, it
>> > might help.
>> >
>> > Thanks,
>> > Hari
>> >
>> >
>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >
>> >>
>> >> The problems you guys are discussing come from trying to store state in
>> >> spark, so don't do that.  Spark isn't a distributed database.
>> >>
>> >> Just map kafka partitions directly to rdds, llet user code specify the
>> >> range of offsets explicitly, and let them be in charge of committing
>> >> offsets.
>> >>
>> >> Using the simple consumer isn't that bad, I'm already using this in
>> >> production with the code I linked to, and tresata apparently has been
>> as
>> >> well.  Again, for everyone saying this is impossible, have you read
>> either
>> >> of those implementations and looked at the approach?
>> >>
>> >>
>> >>
>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>> >> Sean.McNamara@webtrends.com> wrote:
>> >>
>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>> >>> once spark streaming semantics can easily be solved using
>> updateStateByKey.
>> >>> Make the key going into updateStateByKey be a hash of the event, or
>> pluck
>> >>> off some uuid from the message.  The updateFunc would only emit the
>> message
>> >>> if the key did not exist, and the user has complete control over the
>> window
>> >>> of time / state lifecycle for detecting duplicates.  It also makes it
>> >>> really easy to detect and take action (alert?) when you DO see a
>> duplicate,
>> >>> or make memory tradeoffs within an error bound using a sketch
>> algorithm.
>> >>> The kafka simple consumer is insanely complex, if possible I think it
>> would
>> >>> be better (and vastly more flexible) to get reliability using the
>> >>> primitives that spark so elegantly provides.
>> >>>
>> >>> Cheers,
>> >>>
>> >>> Sean
>> >>>
>> >>>
>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>> >>> hshreedharan@cloudera.com> wrote:
>> >>> >
>> >>> > Hi Dibyendu,
>> >>> >
>> >>> > Thanks for the details on the implementation. But I still do not
>> >>> believe
>> >>> > that it is no duplicates - what they achieve is that the same batch
>> is
>> >>> > processed exactly the same way every time (but see it may be
>> processed
>> >>> more
>> >>> > than once) - so it depends on the operation being idempotent. I
>> believe
>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>> >>> > processed multiple times in failure scenarios (for example, the
>> >>> transaction
>> >>> > is processed but before ZK is updated the machine fails, causing a
>> >>> "new"
>> >>> > node to process it again).
>> >>> >
>> >>> > I don't think it is impossible to do this in Spark Streaming as well
>> >>> and
>> >>> > I'd be really interested in working on it at some point in the near
>> >>> future.
>> >>> >
>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>> >>> > dibyendu.bhattachary@gmail.com> wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >>
>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>> Storm
>> >>> >> Trident has done the exact-once guarantee by processing the tuple
>> in a
>> >>> >> batch  and assigning same transaction-id for a given batch . The
>> >>> replay for
>> >>> >> a given batch with a transaction-id will have exact same set of
>> >>> tuples and
>> >>> >> replay of batches happen in exact same order before the failure.
>> >>> >>
>> >>> >> Having this paradigm, if downstream system process data for a given
>> >>> batch
>> >>> >> for having a given transaction-id , and if during failure if same
>> >>> batch is
>> >>> >> again emitted , you can check if same transaction-id is already
>> >>> processed
>> >>> >> or not and hence can guarantee exact once semantics.
>> >>> >>
>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>> >>> consumer
>> >>> >> API to process the offsets. This low level Kafka Consumer (
>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
>> the
>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>> >>> Kafka
>> >>> >> related logic has been taken from Storm-Kafka spout and which
>> manages
>> >>> all
>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>> >>> managements.
>> >>> >>
>> >>> >> Presently this Consumer maintains that during Receiver failure, it
>> >>> will
>> >>> >> re-emit the exact same Block with same set of messages . Every
>> >>> message have
>> >>> >> the details of its partition, offset and topic related details
>> which
>> >>> can
>> >>> >> tackle the SPARK-3146.
>> >>> >>
>> >>> >> As this Low Level consumer has complete control over the Kafka
>> >>> Offsets ,
>> >>> >> we can implement Trident like feature on top of it like having
>> >>> implement a
>> >>> >> transaction-id for a given block , and re-emit the same block with
>> >>> same set
>> >>> >> of message during Driver failure.
>> >>> >>
>> >>> >> Regards,
>> >>> >> Dibyendu
>> >>> >>
>> >>> >>
>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>> saisai.shao@intel.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi all,
>> >>> >>>
>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>> >>> >>> guarantee, especially in the failure situation. From my
>> >>> understanding even
>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>> >>> guarantee the
>> >>> >>> exact once semantics once failed, first is the ordering of data
>> >>> replaying
>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>> >>> partitions
>> >>> >>> are injected in; second is the design complexity of achieving
>> this,
>> >>> you can
>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>> >>> details
>> >>> >>> of Kafka metadata management system to achieve this, not to say
>> >>> rebalance
>> >>> >>> and fault-tolerance.
>> >>> >>>
>> >>> >>> Thanks
>> >>> >>> Jerry
>> >>> >>>
>> >>> >>> -----Original Message-----
>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>> >>> >>> To: Cody Koeninger
>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> >>> >>> Subject: Re: Which committers care about Kafka?
>> >>> >>>
>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>> only
>> >>> once
>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>> >>> implying
>> >>> >>> this is easy to achieve.
>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org>
>> wrote:
>> >>> >>>
>> >>> >>>> If the downstream store for the output data is idempotent or
>> >>> >>>> transactional, and that downstream store also is the system of
>> >>> record
>> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>> >>> >>>> offsets with / after the data is stored.  On any failure, restart
>> >>> from
>> >>> >>> the last committed offsets.
>> >>> >>>>
>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>> rather
>> >>> >>>> than near-realtime-analytics use cases.
>> >>> >>>>
>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> >>> >>>> hshreedharan@cloudera.com
>> >>> >>>>> wrote:
>> >>> >>>>>
>> >>> >>>>> I get what you are saying. But getting exactly once right is an
>> >>> >>>>> extremely hard problem - especially in presence of failure. The
>> >>> >>>>> issue is failures
>> >>> >>>> can
>> >>> >>>>> happen in a bunch of places. For example, before the
>> notification
>> >>> of
>> >>> >>>>> downstream store being successful reaches the receiver that
>> updates
>> >>> >>>>> the offsets, the node fails. The store was successful, but
>> >>> >>>>> duplicates came in either way. This is something worth
>> discussing
>> >>> by
>> >>> >>>>> itself - but without uuids etc this might not really be solved
>> even
>> >>> >>> when you think it is.
>> >>> >>>>>
>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
>> >>> the
>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>> >>> >>>>> delivery,
>> >>> >>>> but
>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>> >>> really
>> >>> >>>> would
>> >>> >>>>> love to have that!
>> >>> >>>>>
>> >>> >>>>> Thanks,
>> >>> >>>>> Hari
>> >>> >>>>>
>> >>> >>>>>
>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> >>> >>>>> <co...@koeninger.org>
>> >>> >>>>> wrote:
>> >>> >>>>>
>> >>> >>>>>> Thanks for the replies.
>> >>> >>>>>>
>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
>> you
>> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
>> >>> >>>>>> offsets
>> >>> >>>> as
>> >>> >>>>>> well, including the ability to not use zookeeper as the system
>> of
>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>> has
>> >>> >>>>>> strong
>> >>> >>>> ordering
>> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
>> >>> >>>> zookeeper
>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>> acts
>> >>> as
>> >>> >>>>>> a
>> >>> >>>> very
>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>> >>> >>>>>> downstream
>> >>> >>>> store.
>> >>> >>>>>>
>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>> >>> >>>> independent
>> >>> >>>>>> implementations of the idea, chances are it's something people
>> >>> need.
>> >>> >>>>>>
>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>> Hi Cody,
>> >>> >>>>>>>
>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>> >>> >>>>>>> something pretty simple and lightweight for that one.
>> >>> >>>>>>>
>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this
>> is
>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
>> >>> good
>> >>> >>>> idea to
>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>> >>> >>>> optimization. For
>> >>> >>>>>>> that reason, we must be careful in implementation. There are a
>> >>> >>>>>>> couple
>> >>> >>>> of
>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>> >>> >>> ordering.
>> >>> >>>> To
>> >>> >>>>>>> ensure we pull messages from different topics and partitions
>> in
>> >>> >>>>>>> the
>> >>> >>>> same
>> >>> >>>>>>> order after failure, we’d still have to persist the metadata
>> to
>> >>> >>>>>>> HDFS
>> >>> >>>> (or
>> >>> >>>>>>> some other system) - this metadata must contain the order of
>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I
>> am
>> >>> >>>>>>> planning to
>> >>> >>>> explore
>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
>> >>> must
>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>> file a
>> >>> >>>>>>> placeholder jira for this one.
>> >>> >>>>>>>
>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
>> >>> this
>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>> want.
>> >>> >>>>>>> I am open
>> >>> >>>> to
>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>> >>> >>>>>>>
>> >>> >>>>>>> Thanks,
>> >>> >>>>>>> Hari
>> >>> >>>>>>>
>> >>> >>>>>>>
>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> >>> >>>>>>> <pw...@gmail.com>
>> >>> >>>>>>> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>>> Hey Cody,
>> >>> >>>>>>>>
>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
>> TD -
>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
>> the
>> >>> >>>>>>>> high level point of whether Kafka is important - it
>> definitely
>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>> library.
>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>> unstable
>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>> stabilize
>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>> able
>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
>> >>> only
>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>> worth
>> >>> >>> revisiting given the developments in Kafka.
>> >>> >>>>>>>>
>> >>> >>>>>>>> Please do bring things up like this on the dev list if there
>> are
>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>> >>> >>>>>>>>
>> >>> >>>>>>>> - Patrick
>> >>> >>>>>>>>
>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> >>> >>>>>>>> <co...@koeninger.org>
>> >>> >>>>>>>> wrote:
>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
>> our
>> >>> >>>>>>>> production
>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>> I've
>> >>> >>>>>>>>> seen several different patches and libraries to fix the same
>> >>> >>>>>>>>> things we've
>> >>> >>>>>>>> been
>> >>> >>>>>>>>> running into.
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Regarding flexibility
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>> >>> >>>>>>>>> ourselves, then found
>> >>> >>>>>>>> that
>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>> able
>> >>> >>>>>>>>> to
>> >>> >>>> get
>> >>> >>>>>>>> our
>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>> >>> >>>>>>>>> delay of
>> >>> >>>>>>>> messages, etc).
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>> >>> >>>>>>>>> WriteAheadLog
>> >>> >>>> as
>> >>> >>>>>>>> a
>> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>> >>> >>>>>>>>> already
>> >>> >>>>>>>> is a
>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>> treats
>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>> >>> >>>>>>>>> write-ahead log in hdfs, I
>> >>> >>>> need
>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>> >>> >>>>>>>>> failures
>> >>> >>>> that
>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>> >>> instance).
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> I've got an implementation i've been using
>> >>> >>>>>>>>>
>> >>> >>>>>>>>>
>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Tresata has something similar at
>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>> rather
>> >>> >>>>>>>>> than
>> >>> >>>>>>>> patches
>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>> >>> >>>> implementation
>> >>> >>>>>>>> are
>> >>> >>>>>>>>> private[spark].
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>> >>> >>>>>>>>
>> >>> >>>>>>>>
>> >>> -----------------------------------------------------------------
>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>> dev-unsubscribe@spark.apache.org
>> >>> For
>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>> >>> >>>>>>>>
>> >>> >>>>>>>>
>> >>> >>>>>>>
>> >>> >>>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>>
>> >>>
>> >>
>> >
>>
>

Re: Which committers care about Kafka?

Posted by Koert Kuipers <ko...@tresata.com>.
yup, we at tresata do the idempotent store the same way. very simple
approach.

On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>
> That KafkaRDD code is dead simple.
>
> Given a user specified map
>
> (topic1, partition0) -> (startingOffset, endingOffset)
> (topic1, partition1) -> (startingOffset, endingOffset)
> ...
> turn each one of those entries into a partition of an rdd, using the simple
> consumer.
> That's it.  No recovery logic, no state, nothing - for any failures, bail
> on the rdd and let it retry.
> Spark stays out of the business of being a distributed database.
>
> The client code does any transformation it wants, then stores the data and
> offsets.  There are two ways of doing this, either based on idempotence or
> a transactional data store.
>
> For idempotent stores:
>
> 1.manipulate data
> 2.save data to store
> 3.save ending offsets to the same store
>
> If you fail between 2 and 3, the offsets haven't been stored, you start
> again at the same beginning offsets, do the same calculations in the same
> order, overwrite the same data, all is good.
>
>
> For transactional stores:
>
> 1. manipulate data
> 2. begin transaction
> 3. save data to the store
> 4. save offsets
> 5. commit transaction
>
> If you fail before 5, the transaction rolls back.  To make this less
> heavyweight, you can write the data outside the transaction and then update
> a pointer to the current data inside the transaction.
>
>
> Again, spark has nothing much to do with guaranteeing exactly once.  In
> fact, the current streaming api actively impedes my ability to do the
> above.  I'm just suggesting providing an api that doesn't get in the way of
> exactly-once.
>
>
>
>
>
> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
> hshreedharan@cloudera.com
> > wrote:
>
> > Can you explain your basic algorithm for the once-only-delivery? It is
> > quite a bit of very Kafka-specific code, that would take more time to
> read
> > than I can currently afford? If you can explain your algorithm a bit, it
> > might help.
> >
> > Thanks,
> > Hari
> >
> >
> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
> > wrote:
> >
> >>
> >> The problems you guys are discussing come from trying to store state in
> >> spark, so don't do that.  Spark isn't a distributed database.
> >>
> >> Just map kafka partitions directly to rdds, llet user code specify the
> >> range of offsets explicitly, and let them be in charge of committing
> >> offsets.
> >>
> >> Using the simple consumer isn't that bad, I'm already using this in
> >> production with the code I linked to, and tresata apparently has been as
> >> well.  Again, for everyone saying this is impossible, have you read
> either
> >> of those implementations and looked at the approach?
> >>
> >>
> >>
> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
> >> Sean.McNamara@webtrends.com> wrote:
> >>
> >>> Please feel free to correct me if I’m wrong, but I think the exactly
> >>> once spark streaming semantics can easily be solved using
> updateStateByKey.
> >>> Make the key going into updateStateByKey be a hash of the event, or
> pluck
> >>> off some uuid from the message.  The updateFunc would only emit the
> message
> >>> if the key did not exist, and the user has complete control over the
> window
> >>> of time / state lifecycle for detecting duplicates.  It also makes it
> >>> really easy to detect and take action (alert?) when you DO see a
> duplicate,
> >>> or make memory tradeoffs within an error bound using a sketch
> algorithm.
> >>> The kafka simple consumer is insanely complex, if possible I think it
> would
> >>> be better (and vastly more flexible) to get reliability using the
> >>> primitives that spark so elegantly provides.
> >>>
> >>> Cheers,
> >>>
> >>> Sean
> >>>
> >>>
> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> >>> hshreedharan@cloudera.com> wrote:
> >>> >
> >>> > Hi Dibyendu,
> >>> >
> >>> > Thanks for the details on the implementation. But I still do not
> >>> believe
> >>> > that it is no duplicates - what they achieve is that the same batch
> is
> >>> > processed exactly the same way every time (but see it may be
> processed
> >>> more
> >>> > than once) - so it depends on the operation being idempotent. I
> believe
> >>> > Trident uses ZK to keep track of the transactions - a batch can be
> >>> > processed multiple times in failure scenarios (for example, the
> >>> transaction
> >>> > is processed but before ZK is updated the machine fails, causing a
> >>> "new"
> >>> > node to process it again).
> >>> >
> >>> > I don't think it is impossible to do this in Spark Streaming as well
> >>> and
> >>> > I'd be really interested in working on it at some point in the near
> >>> future.
> >>> >
> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> >>> > dibyendu.bhattachary@gmail.com> wrote:
> >>> >
> >>> >> Hi,
> >>> >>
> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
> Storm
> >>> >> Trident has done the exact-once guarantee by processing the tuple
> in a
> >>> >> batch  and assigning same transaction-id for a given batch . The
> >>> replay for
> >>> >> a given batch with a transaction-id will have exact same set of
> >>> tuples and
> >>> >> replay of batches happen in exact same order before the failure.
> >>> >>
> >>> >> Having this paradigm, if downstream system process data for a given
> >>> batch
> >>> >> for having a given transaction-id , and if during failure if same
> >>> batch is
> >>> >> again emitted , you can check if same transaction-id is already
> >>> processed
> >>> >> or not and hence can guarantee exact once semantics.
> >>> >>
> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
> >>> consumer
> >>> >> API to process the offsets. This low level Kafka Consumer (
> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
> the
> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
> >>> Kafka
> >>> >> related logic has been taken from Storm-Kafka spout and which
> manages
> >>> all
> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> >>> managements.
> >>> >>
> >>> >> Presently this Consumer maintains that during Receiver failure, it
> >>> will
> >>> >> re-emit the exact same Block with same set of messages . Every
> >>> message have
> >>> >> the details of its partition, offset and topic related details which
> >>> can
> >>> >> tackle the SPARK-3146.
> >>> >>
> >>> >> As this Low Level consumer has complete control over the Kafka
> >>> Offsets ,
> >>> >> we can implement Trident like feature on top of it like having
> >>> implement a
> >>> >> transaction-id for a given block , and re-emit the same block with
> >>> same set
> >>> >> of message during Driver failure.
> >>> >>
> >>> >> Regards,
> >>> >> Dibyendu
> >>> >>
> >>> >>
> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
> saisai.shao@intel.com>
> >>> >> wrote:
> >>> >>>
> >>> >>> Hi all,
> >>> >>>
> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
> >>> >>> guarantee, especially in the failure situation. From my
> >>> understanding even
> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
> >>> guarantee the
> >>> >>> exact once semantics once failed, first is the ordering of data
> >>> replaying
> >>> >>> from last checkpoint, this is hard to guarantee when multiple
> >>> partitions
> >>> >>> are injected in; second is the design complexity of achieving this,
> >>> you can
> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
> >>> details
> >>> >>> of Kafka metadata management system to achieve this, not to say
> >>> rebalance
> >>> >>> and fault-tolerance.
> >>> >>>
> >>> >>> Thanks
> >>> >>> Jerry
> >>> >>>
> >>> >>> -----Original Message-----
> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>> >>> To: Cody Koeninger
> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>> >>> Subject: Re: Which committers care about Kafka?
> >>> >>>
> >>> >>> But idempotency is not that easy t achieve sometimes. A strong only
> >>> once
> >>> >>> semantic through a proper API would  be superuseful; but I'm not
> >>> implying
> >>> >>> this is easy to achieve.
> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
> >>> >>>
> >>> >>>> If the downstream store for the output data is idempotent or
> >>> >>>> transactional, and that downstream store also is the system of
> >>> record
> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
> >>> >>>> offsets with / after the data is stored.  On any failure, restart
> >>> from
> >>> >>> the last committed offsets.
> >>> >>>>
> >>> >>>> Yes, this approach is biased towards the etl-like use cases rather
> >>> >>>> than near-realtime-analytics use cases.
> >>> >>>>
> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>> >>>> hshreedharan@cloudera.com
> >>> >>>>> wrote:
> >>> >>>>>
> >>> >>>>> I get what you are saying. But getting exactly once right is an
> >>> >>>>> extremely hard problem - especially in presence of failure. The
> >>> >>>>> issue is failures
> >>> >>>> can
> >>> >>>>> happen in a bunch of places. For example, before the notification
> >>> of
> >>> >>>>> downstream store being successful reaches the receiver that
> updates
> >>> >>>>> the offsets, the node fails. The store was successful, but
> >>> >>>>> duplicates came in either way. This is something worth discussing
> >>> by
> >>> >>>>> itself - but without uuids etc this might not really be solved
> even
> >>> >>> when you think it is.
> >>> >>>>>
> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
> >>> the
> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
> >>> >>>>> delivery,
> >>> >>>> but
> >>> >>>>> I doubt the latter is really possible to guarantee - though I
> >>> really
> >>> >>>> would
> >>> >>>>> love to have that!
> >>> >>>>>
> >>> >>>>> Thanks,
> >>> >>>>> Hari
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>> >>>>> <co...@koeninger.org>
> >>> >>>>> wrote:
> >>> >>>>>
> >>> >>>>>> Thanks for the replies.
> >>> >>>>>>
> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
> you
> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
> >>> >>>>>> offsets
> >>> >>>> as
> >>> >>>>>> well, including the ability to not use zookeeper as the system
> of
> >>> >>>>>> record for offsets.  Kafka already is a reliable system that has
> >>> >>>>>> strong
> >>> >>>> ordering
> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
> >>> >>>> zookeeper
> >>> >>>>>> to store offsets.  I think there should be a spark api that acts
> >>> as
> >>> >>>>>> a
> >>> >>>> very
> >>> >>>>>> simple intermediary between Kafka and the user's choice of
> >>> >>>>>> downstream
> >>> >>>> store.
> >>> >>>>>>
> >>> >>>>>> Take a look at the links I posted - if there's already been 2
> >>> >>>> independent
> >>> >>>>>> implementations of the idea, chances are it's something people
> >>> need.
> >>> >>>>>>
> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>> >>>>>> hshreedharan@cloudera.com> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Hi Cody,
> >>> >>>>>>>
> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
> >>> >>>>>>> something pretty simple and lightweight for that one.
> >>> >>>>>>>
> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
> >>> good
> >>> >>>> idea to
> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>> >>>> optimization. For
> >>> >>>>>>> that reason, we must be careful in implementation. There are a
> >>> >>>>>>> couple
> >>> >>>> of
> >>> >>>>>>> issues that we need to ensure works properly - specifically
> >>> >>> ordering.
> >>> >>>> To
> >>> >>>>>>> ensure we pull messages from different topics and partitions in
> >>> >>>>>>> the
> >>> >>>> same
> >>> >>>>>>> order after failure, we’d still have to persist the metadata to
> >>> >>>>>>> HDFS
> >>> >>>> (or
> >>> >>>>>>> some other system) - this metadata must contain the order of
> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I am
> >>> >>>>>>> planning to
> >>> >>>> explore
> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
> >>> must
> >>> >>>>>>> also ensure bucketing functions work fine as well. I will file
> a
> >>> >>>>>>> placeholder jira for this one.
> >>> >>>>>>>
> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
> >>> this
> >>> >>>>>>> will get pulled in soon, as this is something I know people
> want.
> >>> >>>>>>> I am open
> >>> >>>> to
> >>> >>>>>>> feedback on that - anything that I can do to make it better.
> >>> >>>>>>>
> >>> >>>>>>> Thanks,
> >>> >>>>>>> Hari
> >>> >>>>>>>
> >>> >>>>>>>
> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>> >>>>>>> <pw...@gmail.com>
> >>> >>>>>>> wrote:
> >>> >>>>>>>
> >>> >>>>>>>> Hey Cody,
> >>> >>>>>>>>
> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
> TD -
> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
> the
> >>> >>>>>>>> high level point of whether Kafka is important - it definitely
> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
> library.
> >>> >>>>>>>> In some cases IIRC there were user libraries that used
> unstable
> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
> able
> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
> >>> only
> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
> worth
> >>> >>> revisiting given the developments in Kafka.
> >>> >>>>>>>>
> >>> >>>>>>>> Please do bring things up like this on the dev list if there
> are
> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>> >>>>>>>>
> >>> >>>>>>>> - Patrick
> >>> >>>>>>>>
> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>> >>>>>>>> <co...@koeninger.org>
> >>> >>>>>>>> wrote:
> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>> >>>>>>>>>
> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
> our
> >>> >>>>>>>> production
> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
> I've
> >>> >>>>>>>>> seen several different patches and libraries to fix the same
> >>> >>>>>>>>> things we've
> >>> >>>>>>>> been
> >>> >>>>>>>>> running into.
> >>> >>>>>>>>>
> >>> >>>>>>>>> Regarding flexibility
> >>> >>>>>>>>>
> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>> >>>>>>>>>
> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
> >>> >>>>>>>>> ourselves, then found
> >>> >>>>>>>> that
> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
> able
> >>> >>>>>>>>> to
> >>> >>>> get
> >>> >>>>>>>> our
> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
> >>> >>>>>>>>> delay of
> >>> >>>>>>>> messages, etc).
> >>> >>>>>>>>>
> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>> >>>>>>>>> WriteAheadLog
> >>> >>>> as
> >>> >>>>>>>> a
> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
> >>> >>>>>>>>> already
> >>> >>>>>>>> is a
> >>> >>>>>>>>> reliable source. I think there's a need for an api that
> treats
> >>> >>>>>>>>> it as such. Even aside from the performance issues of
> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>> >>>>>>>>> write-ahead log in hdfs, I
> >>> >>>> need
> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
> >>> >>>>>>>>> failures
> >>> >>>> that
> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
> >>> instance).
> >>> >>>>>>>>>
> >>> >>>>>>>>> I've got an implementation i've been using
> >>> >>>>>>>>>
> >>> >>>>>>>>>
> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>> >>>>>>>>>
> >>> >>>>>>>>> Tresata has something similar at
> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
> >>> >>>>>>>>>
> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
> >>> >>>>>>>>> than
> >>> >>>>>>>> patches
> >>> >>>>>>>>> to Spark is problematic, because large portions of the
> >>> >>>> implementation
> >>> >>>>>>>> are
> >>> >>>>>>>>> private[spark].
> >>> >>>>>>>>>
> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> -----------------------------------------------------------------
> >>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >>> For
> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>>
> >>> >>>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> >>>
> >>
> >
>

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
That KafkaRDD code is dead simple.

Given a user specified map

(topic1, partition0) -> (startingOffset, endingOffset)
(topic1, partition1) -> (startingOffset, endingOffset)
...
turn each one of those entries into a partition of an rdd, using the simple
consumer.
That's it.  No recovery logic, no state, nothing - for any failures, bail
on the rdd and let it retry.
Spark stays out of the business of being a distributed database.

The client code does any transformation it wants, then stores the data and
offsets.  There are two ways of doing this, either based on idempotence or
a transactional data store.

For idempotent stores:

1.manipulate data
2.save data to store
3.save ending offsets to the same store

If you fail between 2 and 3, the offsets haven't been stored, you start
again at the same beginning offsets, do the same calculations in the same
order, overwrite the same data, all is good.


For transactional stores:

1. manipulate data
2. begin transaction
3. save data to the store
4. save offsets
5. commit transaction

If you fail before 5, the transaction rolls back.  To make this less
heavyweight, you can write the data outside the transaction and then update
a pointer to the current data inside the transaction.


Again, spark has nothing much to do with guaranteeing exactly once.  In
fact, the current streaming api actively impedes my ability to do the
above.  I'm just suggesting providing an api that doesn't get in the way of
exactly-once.





On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> Can you explain your basic algorithm for the once-only-delivery? It is
> quite a bit of very Kafka-specific code, that would take more time to read
> than I can currently afford? If you can explain your algorithm a bit, it
> might help.
>
> Thanks,
> Hari
>
>
> On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>>
>> The problems you guys are discussing come from trying to store state in
>> spark, so don't do that.  Spark isn't a distributed database.
>>
>> Just map kafka partitions directly to rdds, llet user code specify the
>> range of offsets explicitly, and let them be in charge of committing
>> offsets.
>>
>> Using the simple consumer isn't that bad, I'm already using this in
>> production with the code I linked to, and tresata apparently has been as
>> well.  Again, for everyone saying this is impossible, have you read either
>> of those implementations and looked at the approach?
>>
>>
>>
>> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>> Sean.McNamara@webtrends.com> wrote:
>>
>>> Please feel free to correct me if I’m wrong, but I think the exactly
>>> once spark streaming semantics can easily be solved using updateStateByKey.
>>> Make the key going into updateStateByKey be a hash of the event, or pluck
>>> off some uuid from the message.  The updateFunc would only emit the message
>>> if the key did not exist, and the user has complete control over the window
>>> of time / state lifecycle for detecting duplicates.  It also makes it
>>> really easy to detect and take action (alert?) when you DO see a duplicate,
>>> or make memory tradeoffs within an error bound using a sketch algorithm.
>>> The kafka simple consumer is insanely complex, if possible I think it would
>>> be better (and vastly more flexible) to get reliability using the
>>> primitives that spark so elegantly provides.
>>>
>>> Cheers,
>>>
>>> Sean
>>>
>>>
>>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>> hshreedharan@cloudera.com> wrote:
>>> >
>>> > Hi Dibyendu,
>>> >
>>> > Thanks for the details on the implementation. But I still do not
>>> believe
>>> > that it is no duplicates - what they achieve is that the same batch is
>>> > processed exactly the same way every time (but see it may be processed
>>> more
>>> > than once) - so it depends on the operation being idempotent. I believe
>>> > Trident uses ZK to keep track of the transactions - a batch can be
>>> > processed multiple times in failure scenarios (for example, the
>>> transaction
>>> > is processed but before ZK is updated the machine fails, causing a
>>> "new"
>>> > node to process it again).
>>> >
>>> > I don't think it is impossible to do this in Spark Streaming as well
>>> and
>>> > I'd be really interested in working on it at some point in the near
>>> future.
>>> >
>>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>> > dibyendu.bhattachary@gmail.com> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
>>> >> Trident has done the exact-once guarantee by processing the tuple in a
>>> >> batch  and assigning same transaction-id for a given batch . The
>>> replay for
>>> >> a given batch with a transaction-id will have exact same set of
>>> tuples and
>>> >> replay of batches happen in exact same order before the failure.
>>> >>
>>> >> Having this paradigm, if downstream system process data for a given
>>> batch
>>> >> for having a given transaction-id , and if during failure if same
>>> batch is
>>> >> again emitted , you can check if same transaction-id is already
>>> processed
>>> >> or not and hence can guarantee exact once semantics.
>>> >>
>>> >> And this can only be achieved in Spark if we use Low Level Kafka
>>> consumer
>>> >> API to process the offsets. This low level Kafka Consumer (
>>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
>>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>>> Kafka
>>> >> related logic has been taken from Storm-Kafka spout and which manages
>>> all
>>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>> managements.
>>> >>
>>> >> Presently this Consumer maintains that during Receiver failure, it
>>> will
>>> >> re-emit the exact same Block with same set of messages . Every
>>> message have
>>> >> the details of its partition, offset and topic related details which
>>> can
>>> >> tackle the SPARK-3146.
>>> >>
>>> >> As this Low Level consumer has complete control over the Kafka
>>> Offsets ,
>>> >> we can implement Trident like feature on top of it like having
>>> implement a
>>> >> transaction-id for a given block , and re-emit the same block with
>>> same set
>>> >> of message during Driver failure.
>>> >>
>>> >> Regards,
>>> >> Dibyendu
>>> >>
>>> >>
>>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com>
>>> >> wrote:
>>> >>>
>>> >>> Hi all,
>>> >>>
>>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>>> >>> guarantee, especially in the failure situation. From my
>>> understanding even
>>> >>> current implementation of ReliableKafkaReceiver cannot fully
>>> guarantee the
>>> >>> exact once semantics once failed, first is the ordering of data
>>> replaying
>>> >>> from last checkpoint, this is hard to guarantee when multiple
>>> partitions
>>> >>> are injected in; second is the design complexity of achieving this,
>>> you can
>>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>>> details
>>> >>> of Kafka metadata management system to achieve this, not to say
>>> rebalance
>>> >>> and fault-tolerance.
>>> >>>
>>> >>> Thanks
>>> >>> Jerry
>>> >>>
>>> >>> -----Original Message-----
>>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>> >>> To: Cody Koeninger
>>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>> >>> Subject: Re: Which committers care about Kafka?
>>> >>>
>>> >>> But idempotency is not that easy t achieve sometimes. A strong only
>>> once
>>> >>> semantic through a proper API would  be superuseful; but I'm not
>>> implying
>>> >>> this is easy to achieve.
>>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
>>> >>>
>>> >>>> If the downstream store for the output data is idempotent or
>>> >>>> transactional, and that downstream store also is the system of
>>> record
>>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>>> >>>> offsets with / after the data is stored.  On any failure, restart
>>> from
>>> >>> the last committed offsets.
>>> >>>>
>>> >>>> Yes, this approach is biased towards the etl-like use cases rather
>>> >>>> than near-realtime-analytics use cases.
>>> >>>>
>>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>> >>>> hshreedharan@cloudera.com
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>> I get what you are saying. But getting exactly once right is an
>>> >>>>> extremely hard problem - especially in presence of failure. The
>>> >>>>> issue is failures
>>> >>>> can
>>> >>>>> happen in a bunch of places. For example, before the notification
>>> of
>>> >>>>> downstream store being successful reaches the receiver that updates
>>> >>>>> the offsets, the node fails. The store was successful, but
>>> >>>>> duplicates came in either way. This is something worth discussing
>>> by
>>> >>>>> itself - but without uuids etc this might not really be solved even
>>> >>> when you think it is.
>>> >>>>>
>>> >>>>> Anyway, I will look at the links. Even I am interested in all of
>>> the
>>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>> >>>>> delivery,
>>> >>>> but
>>> >>>>> I doubt the latter is really possible to guarantee - though I
>>> really
>>> >>>> would
>>> >>>>> love to have that!
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Hari
>>> >>>>>
>>> >>>>>
>>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>> >>>>> <co...@koeninger.org>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> Thanks for the replies.
>>> >>>>>>
>>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If you
>>> >>>>>> actually want exactly-once semantics, you need control of kafka
>>> >>>>>> offsets
>>> >>>> as
>>> >>>>>> well, including the ability to not use zookeeper as the system of
>>> >>>>>> record for offsets.  Kafka already is a reliable system that has
>>> >>>>>> strong
>>> >>>> ordering
>>> >>>>>> guarantees (within a partition) and does not mandate the use of
>>> >>>> zookeeper
>>> >>>>>> to store offsets.  I think there should be a spark api that acts
>>> as
>>> >>>>>> a
>>> >>>> very
>>> >>>>>> simple intermediary between Kafka and the user's choice of
>>> >>>>>> downstream
>>> >>>> store.
>>> >>>>>>
>>> >>>>>> Take a look at the links I posted - if there's already been 2
>>> >>>> independent
>>> >>>>>> implementations of the idea, chances are it's something people
>>> need.
>>> >>>>>>
>>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>> >>>>>> hshreedharan@cloudera.com> wrote:
>>> >>>>>>>
>>> >>>>>>> Hi Cody,
>>> >>>>>>>
>>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>> >>>>>>> something pretty simple and lightweight for that one.
>>> >>>>>>>
>>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
>>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
>>> good
>>> >>>> idea to
>>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>> >>>> optimization. For
>>> >>>>>>> that reason, we must be careful in implementation. There are a
>>> >>>>>>> couple
>>> >>>> of
>>> >>>>>>> issues that we need to ensure works properly - specifically
>>> >>> ordering.
>>> >>>> To
>>> >>>>>>> ensure we pull messages from different topics and partitions in
>>> >>>>>>> the
>>> >>>> same
>>> >>>>>>> order after failure, we’d still have to persist the metadata to
>>> >>>>>>> HDFS
>>> >>>> (or
>>> >>>>>>> some other system) - this metadata must contain the order of
>>> >>>>>>> messages consumed, so we know how to re-read the messages. I am
>>> >>>>>>> planning to
>>> >>>> explore
>>> >>>>>>> this once I have some time (probably in Jan). In addition, we
>>> must
>>> >>>>>>> also ensure bucketing functions work fine as well. I will file a
>>> >>>>>>> placeholder jira for this one.
>>> >>>>>>>
>>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
>>> this
>>> >>>>>>> will get pulled in soon, as this is something I know people want.
>>> >>>>>>> I am open
>>> >>>> to
>>> >>>>>>> feedback on that - anything that I can do to make it better.
>>> >>>>>>>
>>> >>>>>>> Thanks,
>>> >>>>>>> Hari
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>> >>>>>>> <pw...@gmail.com>
>>> >>>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Hey Cody,
>>> >>>>>>>>
>>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is TD -
>>> >>>>>>>> he is traveling this week though so I can respond a bit. To the
>>> >>>>>>>> high level point of whether Kafka is important - it definitely
>>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>>> >>>>>>>> Kafka is something we generally want in Spark and not a library.
>>> >>>>>>>> In some cases IIRC there were user libraries that used unstable
>>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be able
>>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
>>> only
>>> >>>>>>>> though, I haven't talked to TD about this recently so it's worth
>>> >>> revisiting given the developments in Kafka.
>>> >>>>>>>>
>>> >>>>>>>> Please do bring things up like this on the dev list if there are
>>> >>>>>>>> blockers for your usage - thanks for pinging it.
>>> >>>>>>>>
>>> >>>>>>>> - Patrick
>>> >>>>>>>>
>>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>> >>>>>>>> <co...@koeninger.org>
>>> >>>>>>>> wrote:
>>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>>> >>>>>>>>> some long-standing Kafka related issues resolved?
>>> >>>>>>>>>
>>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for our
>>> >>>>>>>> production
>>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because I've
>>> >>>>>>>>> seen several different patches and libraries to fix the same
>>> >>>>>>>>> things we've
>>> >>>>>>>> been
>>> >>>>>>>>> running into.
>>> >>>>>>>>>
>>> >>>>>>>>> Regarding flexibility
>>> >>>>>>>>>
>>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>> >>>>>>>>>
>>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>> >>>>>>>>> ourselves, then found
>>> >>>>>>>> that
>>> >>>>>>>>> PR and have been running it in production. We wouldn't be able
>>> >>>>>>>>> to
>>> >>>> get
>>> >>>>>>>> our
>>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>>> >>>>>>>>> delay of
>>> >>>>>>>> messages, etc).
>>> >>>>>>>>>
>>> >>>>>>>>> Regarding safety, I understand the motivation behind
>>> >>>>>>>>> WriteAheadLog
>>> >>>> as
>>> >>>>>>>> a
>>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>>> >>>>>>>>> already
>>> >>>>>>>> is a
>>> >>>>>>>>> reliable source. I think there's a need for an api that treats
>>> >>>>>>>>> it as such. Even aside from the performance issues of
>>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>>> >>>>>>>>> write-ahead log in hdfs, I
>>> >>>> need
>>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>>> >>>>>>>>> failures
>>> >>>> that
>>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>>> instance).
>>> >>>>>>>>>
>>> >>>>>>>>> I've got an implementation i've been using
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>> >>>>>>>>>
>>> >>>>>>>>> Tresata has something similar at
>>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>>> >>>>>>>>>
>>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
>>> >>>>>>>>> than
>>> >>>>>>>> patches
>>> >>>>>>>>> to Spark is problematic, because large portions of the
>>> >>>> implementation
>>> >>>>>>>> are
>>> >>>>>>>>> private[spark].
>>> >>>>>>>>>
>>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>>> >>>>>>>>
>>> >>>>>>>>
>>> -----------------------------------------------------------------
>>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For
>>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>>
>>
>

Re: Which committers care about Kafka?

Posted by Hari Shreedharan <hs...@cloudera.com>.
Can you explain your basic algorithm for the once-only-delivery? It is quite a bit of very Kafka-specific code, that would take more time to read than I can currently afford? If you can explain your algorithm a bit, it might help.




Thanks, Hari

On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <co...@koeninger.org>
wrote:

> The problems you guys are discussing come from trying to store state in
> spark, so don't do that.  Spark isn't a distributed database.
> Just map kafka partitions directly to rdds, llet user code specify the
> range of offsets explicitly, and let them be in charge of committing
> offsets.
> Using the simple consumer isn't that bad, I'm already using this in
> production with the code I linked to, and tresata apparently has been as
> well.  Again, for everyone saying this is impossible, have you read either
> of those implementations and looked at the approach?
> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <Se...@webtrends.com>
> wrote:
>> Please feel free to correct me if I’m wrong, but I think the exactly once
>> spark streaming semantics can easily be solved using updateStateByKey. Make
>> the key going into updateStateByKey be a hash of the event, or pluck off
>> some uuid from the message.  The updateFunc would only emit the message if
>> the key did not exist, and the user has complete control over the window of
>> time / state lifecycle for detecting duplicates.  It also makes it really
>> easy to detect and take action (alert?) when you DO see a duplicate, or
>> make memory tradeoffs within an error bound using a sketch algorithm.  The
>> kafka simple consumer is insanely complex, if possible I think it would be
>> better (and vastly more flexible) to get reliability using the primitives
>> that spark so elegantly provides.
>>
>> Cheers,
>>
>> Sean
>>
>>
>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>> hshreedharan@cloudera.com> wrote:
>> >
>> > Hi Dibyendu,
>> >
>> > Thanks for the details on the implementation. But I still do not believe
>> > that it is no duplicates - what they achieve is that the same batch is
>> > processed exactly the same way every time (but see it may be processed
>> more
>> > than once) - so it depends on the operation being idempotent. I believe
>> > Trident uses ZK to keep track of the transactions - a batch can be
>> > processed multiple times in failure scenarios (for example, the
>> transaction
>> > is processed but before ZK is updated the machine fails, causing a "new"
>> > node to process it again).
>> >
>> > I don't think it is impossible to do this in Spark Streaming as well and
>> > I'd be really interested in working on it at some point in the near
>> future.
>> >
>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>> > dibyendu.bhattachary@gmail.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
>> >> Trident has done the exact-once guarantee by processing the tuple in a
>> >> batch  and assigning same transaction-id for a given batch . The replay
>> for
>> >> a given batch with a transaction-id will have exact same set of tuples
>> and
>> >> replay of batches happen in exact same order before the failure.
>> >>
>> >> Having this paradigm, if downstream system process data for a given
>> batch
>> >> for having a given transaction-id , and if during failure if same batch
>> is
>> >> again emitted , you can check if same transaction-id is already
>> processed
>> >> or not and hence can guarantee exact once semantics.
>> >>
>> >> And this can only be achieved in Spark if we use Low Level Kafka
>> consumer
>> >> API to process the offsets. This low level Kafka Consumer (
>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
>> >> related logic has been taken from Storm-Kafka spout and which manages
>> all
>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>> managements.
>> >>
>> >> Presently this Consumer maintains that during Receiver failure, it will
>> >> re-emit the exact same Block with same set of messages . Every message
>> have
>> >> the details of its partition, offset and topic related details which can
>> >> tackle the SPARK-3146.
>> >>
>> >> As this Low Level consumer has complete control over the Kafka Offsets ,
>> >> we can implement Trident like feature on top of it like having
>> implement a
>> >> transaction-id for a given block , and re-emit the same block with same
>> set
>> >> of message during Driver failure.
>> >>
>> >> Regards,
>> >> Dibyendu
>> >>
>> >>
>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com>
>> >> wrote:
>> >>>
>> >>> Hi all,
>> >>>
>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>> >>> guarantee, especially in the failure situation. From my understanding
>> even
>> >>> current implementation of ReliableKafkaReceiver cannot fully guarantee
>> the
>> >>> exact once semantics once failed, first is the ordering of data
>> replaying
>> >>> from last checkpoint, this is hard to guarantee when multiple
>> partitions
>> >>> are injected in; second is the design complexity of achieving this,
>> you can
>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>> details
>> >>> of Kafka metadata management system to achieve this, not to say
>> rebalance
>> >>> and fault-tolerance.
>> >>>
>> >>> Thanks
>> >>> Jerry
>> >>>
>> >>> -----Original Message-----
>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>> >>> Sent: Friday, December 19, 2014 5:57 AM
>> >>> To: Cody Koeninger
>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> >>> Subject: Re: Which committers care about Kafka?
>> >>>
>> >>> But idempotency is not that easy t achieve sometimes. A strong only
>> once
>> >>> semantic through a proper API would  be superuseful; but I'm not
>> implying
>> >>> this is easy to achieve.
>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
>> >>>
>> >>>> If the downstream store for the output data is idempotent or
>> >>>> transactional, and that downstream store also is the system of record
>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>> >>>> offsets with / after the data is stored.  On any failure, restart from
>> >>> the last committed offsets.
>> >>>>
>> >>>> Yes, this approach is biased towards the etl-like use cases rather
>> >>>> than near-realtime-analytics use cases.
>> >>>>
>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> >>>> hshreedharan@cloudera.com
>> >>>>> wrote:
>> >>>>>
>> >>>>> I get what you are saying. But getting exactly once right is an
>> >>>>> extremely hard problem - especially in presence of failure. The
>> >>>>> issue is failures
>> >>>> can
>> >>>>> happen in a bunch of places. For example, before the notification of
>> >>>>> downstream store being successful reaches the receiver that updates
>> >>>>> the offsets, the node fails. The store was successful, but
>> >>>>> duplicates came in either way. This is something worth discussing by
>> >>>>> itself - but without uuids etc this might not really be solved even
>> >>> when you think it is.
>> >>>>>
>> >>>>> Anyway, I will look at the links. Even I am interested in all of the
>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>> >>>>> delivery,
>> >>>> but
>> >>>>> I doubt the latter is really possible to guarantee - though I really
>> >>>> would
>> >>>>> love to have that!
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hari
>> >>>>>
>> >>>>>
>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> >>>>> <co...@koeninger.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Thanks for the replies.
>> >>>>>>
>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If you
>> >>>>>> actually want exactly-once semantics, you need control of kafka
>> >>>>>> offsets
>> >>>> as
>> >>>>>> well, including the ability to not use zookeeper as the system of
>> >>>>>> record for offsets.  Kafka already is a reliable system that has
>> >>>>>> strong
>> >>>> ordering
>> >>>>>> guarantees (within a partition) and does not mandate the use of
>> >>>> zookeeper
>> >>>>>> to store offsets.  I think there should be a spark api that acts as
>> >>>>>> a
>> >>>> very
>> >>>>>> simple intermediary between Kafka and the user's choice of
>> >>>>>> downstream
>> >>>> store.
>> >>>>>>
>> >>>>>> Take a look at the links I posted - if there's already been 2
>> >>>> independent
>> >>>>>> implementations of the idea, chances are it's something people need.
>> >>>>>>
>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> >>>>>> hshreedharan@cloudera.com> wrote:
>> >>>>>>>
>> >>>>>>> Hi Cody,
>> >>>>>>>
>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>> >>>>>>> something pretty simple and lightweight for that one.
>> >>>>>>>
>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a good
>> >>>> idea to
>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>> >>>> optimization. For
>> >>>>>>> that reason, we must be careful in implementation. There are a
>> >>>>>>> couple
>> >>>> of
>> >>>>>>> issues that we need to ensure works properly - specifically
>> >>> ordering.
>> >>>> To
>> >>>>>>> ensure we pull messages from different topics and partitions in
>> >>>>>>> the
>> >>>> same
>> >>>>>>> order after failure, we’d still have to persist the metadata to
>> >>>>>>> HDFS
>> >>>> (or
>> >>>>>>> some other system) - this metadata must contain the order of
>> >>>>>>> messages consumed, so we know how to re-read the messages. I am
>> >>>>>>> planning to
>> >>>> explore
>> >>>>>>> this once I have some time (probably in Jan). In addition, we must
>> >>>>>>> also ensure bucketing functions work fine as well. I will file a
>> >>>>>>> placeholder jira for this one.
>> >>>>>>>
>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that this
>> >>>>>>> will get pulled in soon, as this is something I know people want.
>> >>>>>>> I am open
>> >>>> to
>> >>>>>>> feedback on that - anything that I can do to make it better.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Hari
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> >>>>>>> <pw...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hey Cody,
>> >>>>>>>>
>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is TD -
>> >>>>>>>> he is traveling this week though so I can respond a bit. To the
>> >>>>>>>> high level point of whether Kafka is important - it definitely
>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>> >>>>>>>> Kafka is something we generally want in Spark and not a library.
>> >>>>>>>> In some cases IIRC there were user libraries that used unstable
>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be able
>> >>>>>>>> to use newer Kakfa versions. This is a high level impression only
>> >>>>>>>> though, I haven't talked to TD about this recently so it's worth
>> >>> revisiting given the developments in Kafka.
>> >>>>>>>>
>> >>>>>>>> Please do bring things up like this on the dev list if there are
>> >>>>>>>> blockers for your usage - thanks for pinging it.
>> >>>>>>>>
>> >>>>>>>> - Patrick
>> >>>>>>>>
>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> >>>>>>>> <co...@koeninger.org>
>> >>>>>>>> wrote:
>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>> >>>>>>>>> some long-standing Kafka related issues resolved?
>> >>>>>>>>>
>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for our
>> >>>>>>>> production
>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because I've
>> >>>>>>>>> seen several different patches and libraries to fix the same
>> >>>>>>>>> things we've
>> >>>>>>>> been
>> >>>>>>>>> running into.
>> >>>>>>>>>
>> >>>>>>>>> Regarding flexibility
>> >>>>>>>>>
>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>> >>>>>>>>>
>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>> >>>>>>>>> ourselves, then found
>> >>>>>>>> that
>> >>>>>>>>> PR and have been running it in production. We wouldn't be able
>> >>>>>>>>> to
>> >>>> get
>> >>>>>>>> our
>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>> >>>>>>>>> delay of
>> >>>>>>>> messages, etc).
>> >>>>>>>>>
>> >>>>>>>>> Regarding safety, I understand the motivation behind
>> >>>>>>>>> WriteAheadLog
>> >>>> as
>> >>>>>>>> a
>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>> >>>>>>>>> already
>> >>>>>>>> is a
>> >>>>>>>>> reliable source. I think there's a need for an api that treats
>> >>>>>>>>> it as such. Even aside from the performance issues of
>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>> >>>>>>>>> write-ahead log in hdfs, I
>> >>>> need
>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>> >>>>>>>>> failures
>> >>>> that
>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for instance).
>> >>>>>>>>>
>> >>>>>>>>> I've got an implementation i've been using
>> >>>>>>>>>
>> >>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>> >>>>>>>>>
>> >>>>>>>>> Tresata has something similar at
>> >>>>>>>> https://github.com/tresata/spark-kafka,
>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>> >>>>>>>>>
>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
>> >>>>>>>>> than
>> >>>>>>>> patches
>> >>>>>>>>> to Spark is problematic, because large portions of the
>> >>>> implementation
>> >>>>>>>> are
>> >>>>>>>>> private[spark].
>> >>>>>>>>>
>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>> >>>>>>>>
>> >>>>>>>> -----------------------------------------------------------------
>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>>
>>

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
The problems you guys are discussing come from trying to store state in
spark, so don't do that.  Spark isn't a distributed database.

Just map kafka partitions directly to rdds, llet user code specify the
range of offsets explicitly, and let them be in charge of committing
offsets.

Using the simple consumer isn't that bad, I'm already using this in
production with the code I linked to, and tresata apparently has been as
well.  Again, for everyone saying this is impossible, have you read either
of those implementations and looked at the approach?



On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <Se...@webtrends.com>
wrote:

> Please feel free to correct me if I’m wrong, but I think the exactly once
> spark streaming semantics can easily be solved using updateStateByKey. Make
> the key going into updateStateByKey be a hash of the event, or pluck off
> some uuid from the message.  The updateFunc would only emit the message if
> the key did not exist, and the user has complete control over the window of
> time / state lifecycle for detecting duplicates.  It also makes it really
> easy to detect and take action (alert?) when you DO see a duplicate, or
> make memory tradeoffs within an error bound using a sketch algorithm.  The
> kafka simple consumer is insanely complex, if possible I think it would be
> better (and vastly more flexible) to get reliability using the primitives
> that spark so elegantly provides.
>
> Cheers,
>
> Sean
>
>
> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
> >
> > Hi Dibyendu,
> >
> > Thanks for the details on the implementation. But I still do not believe
> > that it is no duplicates - what they achieve is that the same batch is
> > processed exactly the same way every time (but see it may be processed
> more
> > than once) - so it depends on the operation being idempotent. I believe
> > Trident uses ZK to keep track of the transactions - a batch can be
> > processed multiple times in failure scenarios (for example, the
> transaction
> > is processed but before ZK is updated the machine fails, causing a "new"
> > node to process it again).
> >
> > I don't think it is impossible to do this in Spark Streaming as well and
> > I'd be really interested in working on it at some point in the near
> future.
> >
> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> > dibyendu.bhattachary@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
> >> Trident has done the exact-once guarantee by processing the tuple in a
> >> batch  and assigning same transaction-id for a given batch . The replay
> for
> >> a given batch with a transaction-id will have exact same set of tuples
> and
> >> replay of batches happen in exact same order before the failure.
> >>
> >> Having this paradigm, if downstream system process data for a given
> batch
> >> for having a given transaction-id , and if during failure if same batch
> is
> >> again emitted , you can check if same transaction-id is already
> processed
> >> or not and hence can guarantee exact once semantics.
> >>
> >> And this can only be achieved in Spark if we use Low Level Kafka
> consumer
> >> API to process the offsets. This low level Kafka Consumer (
> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
> >> related logic has been taken from Storm-Kafka spout and which manages
> all
> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> managements.
> >>
> >> Presently this Consumer maintains that during Receiver failure, it will
> >> re-emit the exact same Block with same set of messages . Every message
> have
> >> the details of its partition, offset and topic related details which can
> >> tackle the SPARK-3146.
> >>
> >> As this Low Level consumer has complete control over the Kafka Offsets ,
> >> we can implement Trident like feature on top of it like having
> implement a
> >> transaction-id for a given block , and re-emit the same block with same
> set
> >> of message during Driver failure.
> >>
> >> Regards,
> >> Dibyendu
> >>
> >>
> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com>
> >> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I agree with Hari that Strong exact-once semantics is very hard to
> >>> guarantee, especially in the failure situation. From my understanding
> even
> >>> current implementation of ReliableKafkaReceiver cannot fully guarantee
> the
> >>> exact once semantics once failed, first is the ordering of data
> replaying
> >>> from last checkpoint, this is hard to guarantee when multiple
> partitions
> >>> are injected in; second is the design complexity of achieving this,
> you can
> >>> refer to the Kafka Spout in Trident, we have to dig into the very
> details
> >>> of Kafka metadata management system to achieve this, not to say
> rebalance
> >>> and fault-tolerance.
> >>>
> >>> Thanks
> >>> Jerry
> >>>
> >>> -----Original Message-----
> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>> To: Cody Koeninger
> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>> Subject: Re: Which committers care about Kafka?
> >>>
> >>> But idempotency is not that easy t achieve sometimes. A strong only
> once
> >>> semantic through a proper API would  be superuseful; but I'm not
> implying
> >>> this is easy to achieve.
> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
> >>>
> >>>> If the downstream store for the output data is idempotent or
> >>>> transactional, and that downstream store also is the system of record
> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
> >>>> offsets with / after the data is stored.  On any failure, restart from
> >>> the last committed offsets.
> >>>>
> >>>> Yes, this approach is biased towards the etl-like use cases rather
> >>>> than near-realtime-analytics use cases.
> >>>>
> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>>> hshreedharan@cloudera.com
> >>>>> wrote:
> >>>>>
> >>>>> I get what you are saying. But getting exactly once right is an
> >>>>> extremely hard problem - especially in presence of failure. The
> >>>>> issue is failures
> >>>> can
> >>>>> happen in a bunch of places. For example, before the notification of
> >>>>> downstream store being successful reaches the receiver that updates
> >>>>> the offsets, the node fails. The store was successful, but
> >>>>> duplicates came in either way. This is something worth discussing by
> >>>>> itself - but without uuids etc this might not really be solved even
> >>> when you think it is.
> >>>>>
> >>>>> Anyway, I will look at the links. Even I am interested in all of the
> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
> >>>>> delivery,
> >>>> but
> >>>>> I doubt the latter is really possible to guarantee - though I really
> >>>> would
> >>>>> love to have that!
> >>>>>
> >>>>> Thanks,
> >>>>> Hari
> >>>>>
> >>>>>
> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>>>> <co...@koeninger.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the replies.
> >>>>>>
> >>>>>> Regarding skipping WAL, it's not just about optimization.  If you
> >>>>>> actually want exactly-once semantics, you need control of kafka
> >>>>>> offsets
> >>>> as
> >>>>>> well, including the ability to not use zookeeper as the system of
> >>>>>> record for offsets.  Kafka already is a reliable system that has
> >>>>>> strong
> >>>> ordering
> >>>>>> guarantees (within a partition) and does not mandate the use of
> >>>> zookeeper
> >>>>>> to store offsets.  I think there should be a spark api that acts as
> >>>>>> a
> >>>> very
> >>>>>> simple intermediary between Kafka and the user's choice of
> >>>>>> downstream
> >>>> store.
> >>>>>>
> >>>>>> Take a look at the links I posted - if there's already been 2
> >>>> independent
> >>>>>> implementations of the idea, chances are it's something people need.
> >>>>>>
> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>>>>> hshreedharan@cloudera.com> wrote:
> >>>>>>>
> >>>>>>> Hi Cody,
> >>>>>>>
> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
> >>>>>>> something pretty simple and lightweight for that one.
> >>>>>>>
> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
> >>>>>>> something I discussed with TD a few weeks ago. Though it is a good
> >>>> idea to
> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>>> optimization. For
> >>>>>>> that reason, we must be careful in implementation. There are a
> >>>>>>> couple
> >>>> of
> >>>>>>> issues that we need to ensure works properly - specifically
> >>> ordering.
> >>>> To
> >>>>>>> ensure we pull messages from different topics and partitions in
> >>>>>>> the
> >>>> same
> >>>>>>> order after failure, we’d still have to persist the metadata to
> >>>>>>> HDFS
> >>>> (or
> >>>>>>> some other system) - this metadata must contain the order of
> >>>>>>> messages consumed, so we know how to re-read the messages. I am
> >>>>>>> planning to
> >>>> explore
> >>>>>>> this once I have some time (probably in Jan). In addition, we must
> >>>>>>> also ensure bucketing functions work fine as well. I will file a
> >>>>>>> placeholder jira for this one.
> >>>>>>>
> >>>>>>> I also wrote an API to write data back to Kafka a while back -
> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that this
> >>>>>>> will get pulled in soon, as this is something I know people want.
> >>>>>>> I am open
> >>>> to
> >>>>>>> feedback on that - anything that I can do to make it better.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Hari
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>>>>>> <pw...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Cody,
> >>>>>>>>
> >>>>>>>> Thanks for reaching out with this. The lead on streaming is TD -
> >>>>>>>> he is traveling this week though so I can respond a bit. To the
> >>>>>>>> high level point of whether Kafka is important - it definitely
> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
> >>>>>>>> Kafka is something we generally want in Spark and not a library.
> >>>>>>>> In some cases IIRC there were user libraries that used unstable
> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be able
> >>>>>>>> to use newer Kakfa versions. This is a high level impression only
> >>>>>>>> though, I haven't talked to TD about this recently so it's worth
> >>> revisiting given the developments in Kafka.
> >>>>>>>>
> >>>>>>>> Please do bring things up like this on the dev list if there are
> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>>>>>>>
> >>>>>>>> - Patrick
> >>>>>>>>
> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>>>>>>> <co...@koeninger.org>
> >>>>>>>> wrote:
> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>>>>>>>>
> >>>>>>>>> The existing api is not sufficiently safe nor flexible for our
> >>>>>>>> production
> >>>>>>>>> use. I don't think we're alone in this viewpoint, because I've
> >>>>>>>>> seen several different patches and libraries to fix the same
> >>>>>>>>> things we've
> >>>>>>>> been
> >>>>>>>>> running into.
> >>>>>>>>>
> >>>>>>>>> Regarding flexibility
> >>>>>>>>>
> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>>>>>>>>
> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
> >>>>>>>>> ourselves, then found
> >>>>>>>> that
> >>>>>>>>> PR and have been running it in production. We wouldn't be able
> >>>>>>>>> to
> >>>> get
> >>>>>>>> our
> >>>>>>>>> jobs done without it. It also allows users to solve a whole
> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
> >>>>>>>>> delay of
> >>>>>>>> messages, etc).
> >>>>>>>>>
> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>>>>>>>> WriteAheadLog
> >>>> as
> >>>>>>>> a
> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
> >>>>>>>>> already
> >>>>>>>> is a
> >>>>>>>>> reliable source. I think there's a need for an api that treats
> >>>>>>>>> it as such. Even aside from the performance issues of
> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>>>>>>>> write-ahead log in hdfs, I
> >>>> need
> >>>>>>>>> exactly-once semantics in the face of failure (I've had
> >>>>>>>>> failures
> >>>> that
> >>>>>>>>> prevented reloading a spark streaming checkpoint, for instance).
> >>>>>>>>>
> >>>>>>>>> I've got an implementation i've been using
> >>>>>>>>>
> >>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>>>>>>>>
> >>>>>>>>> Tresata has something similar at
> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>>>>>>>> and I know there were earlier attempts based on Storm code.
> >>>>>>>>>
> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
> >>>>>>>>> than
> >>>>>>>> patches
> >>>>>>>>> to Spark is problematic, because large portions of the
> >>>> implementation
> >>>>>>>> are
> >>>>>>>>> private[spark].
> >>>>>>>>>
> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
> >>>>>>>>
> >>>>>>>> -----------------------------------------------------------------
> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Which committers care about Kafka?

Posted by Sean McNamara <Se...@Webtrends.com>.
Please feel free to correct me if I’m wrong, but I think the exactly once spark streaming semantics can easily be solved using updateStateByKey. Make the key going into updateStateByKey be a hash of the event, or pluck off some uuid from the message.  The updateFunc would only emit the message if the key did not exist, and the user has complete control over the window of time / state lifecycle for detecting duplicates.  It also makes it really easy to detect and take action (alert?) when you DO see a duplicate, or make memory tradeoffs within an error bound using a sketch algorithm.  The kafka simple consumer is insanely complex, if possible I think it would be better (and vastly more flexible) to get reliability using the primitives that spark so elegantly provides.

Cheers,

Sean


> On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <hs...@cloudera.com> wrote:
> 
> Hi Dibyendu,
> 
> Thanks for the details on the implementation. But I still do not believe
> that it is no duplicates - what they achieve is that the same batch is
> processed exactly the same way every time (but see it may be processed more
> than once) - so it depends on the operation being idempotent. I believe
> Trident uses ZK to keep track of the transactions - a batch can be
> processed multiple times in failure scenarios (for example, the transaction
> is processed but before ZK is updated the machine fails, causing a "new"
> node to process it again).
> 
> I don't think it is impossible to do this in Spark Streaming as well and
> I'd be really interested in working on it at some point in the near future.
> 
> On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
> 
>> Hi,
>> 
>> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
>> Trident has done the exact-once guarantee by processing the tuple in a
>> batch  and assigning same transaction-id for a given batch . The replay for
>> a given batch with a transaction-id will have exact same set of tuples and
>> replay of batches happen in exact same order before the failure.
>> 
>> Having this paradigm, if downstream system process data for a given batch
>> for having a given transaction-id , and if during failure if same batch is
>> again emitted , you can check if same transaction-id is already processed
>> or not and hence can guarantee exact once semantics.
>> 
>> And this can only be achieved in Spark if we use Low Level Kafka consumer
>> API to process the offsets. This low level Kafka Consumer (
>> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
>> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
>> related logic has been taken from Storm-Kafka spout and which manages all
>> Kafka re-balance and fault tolerant aspects and Kafka metadata managements.
>> 
>> Presently this Consumer maintains that during Receiver failure, it will
>> re-emit the exact same Block with same set of messages . Every message have
>> the details of its partition, offset and topic related details which can
>> tackle the SPARK-3146.
>> 
>> As this Low Level consumer has complete control over the Kafka Offsets ,
>> we can implement Trident like feature on top of it like having implement a
>> transaction-id for a given block , and re-emit the same block with same set
>> of message during Driver failure.
>> 
>> Regards,
>> Dibyendu
>> 
>> 
>> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com>
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I agree with Hari that Strong exact-once semantics is very hard to
>>> guarantee, especially in the failure situation. From my understanding even
>>> current implementation of ReliableKafkaReceiver cannot fully guarantee the
>>> exact once semantics once failed, first is the ordering of data replaying
>>> from last checkpoint, this is hard to guarantee when multiple partitions
>>> are injected in; second is the design complexity of achieving this, you can
>>> refer to the Kafka Spout in Trident, we have to dig into the very details
>>> of Kafka metadata management system to achieve this, not to say rebalance
>>> and fault-tolerance.
>>> 
>>> Thanks
>>> Jerry
>>> 
>>> -----Original Message-----
>>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>> Sent: Friday, December 19, 2014 5:57 AM
>>> To: Cody Koeninger
>>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>> Subject: Re: Which committers care about Kafka?
>>> 
>>> But idempotency is not that easy t achieve sometimes. A strong only once
>>> semantic through a proper API would  be superuseful; but I'm not implying
>>> this is easy to achieve.
>>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
>>> 
>>>> If the downstream store for the output data is idempotent or
>>>> transactional, and that downstream store also is the system of record
>>>> for kafka offsets, then you have exactly-once semantics.  Commit
>>>> offsets with / after the data is stored.  On any failure, restart from
>>> the last committed offsets.
>>>> 
>>>> Yes, this approach is biased towards the etl-like use cases rather
>>>> than near-realtime-analytics use cases.
>>>> 
>>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>>>> hshreedharan@cloudera.com
>>>>> wrote:
>>>>> 
>>>>> I get what you are saying. But getting exactly once right is an
>>>>> extremely hard problem - especially in presence of failure. The
>>>>> issue is failures
>>>> can
>>>>> happen in a bunch of places. For example, before the notification of
>>>>> downstream store being successful reaches the receiver that updates
>>>>> the offsets, the node fails. The store was successful, but
>>>>> duplicates came in either way. This is something worth discussing by
>>>>> itself - but without uuids etc this might not really be solved even
>>> when you think it is.
>>>>> 
>>>>> Anyway, I will look at the links. Even I am interested in all of the
>>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>>>>> delivery,
>>>> but
>>>>> I doubt the latter is really possible to guarantee - though I really
>>>> would
>>>>> love to have that!
>>>>> 
>>>>> Thanks,
>>>>> Hari
>>>>> 
>>>>> 
>>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>>>> <co...@koeninger.org>
>>>>> wrote:
>>>>> 
>>>>>> Thanks for the replies.
>>>>>> 
>>>>>> Regarding skipping WAL, it's not just about optimization.  If you
>>>>>> actually want exactly-once semantics, you need control of kafka
>>>>>> offsets
>>>> as
>>>>>> well, including the ability to not use zookeeper as the system of
>>>>>> record for offsets.  Kafka already is a reliable system that has
>>>>>> strong
>>>> ordering
>>>>>> guarantees (within a partition) and does not mandate the use of
>>>> zookeeper
>>>>>> to store offsets.  I think there should be a spark api that acts as
>>>>>> a
>>>> very
>>>>>> simple intermediary between Kafka and the user's choice of
>>>>>> downstream
>>>> store.
>>>>>> 
>>>>>> Take a look at the links I posted - if there's already been 2
>>>> independent
>>>>>> implementations of the idea, chances are it's something people need.
>>>>>> 
>>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>> 
>>>>>>> Hi Cody,
>>>>>>> 
>>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>>>>>>> something pretty simple and lightweight for that one.
>>>>>>> 
>>>>>>> For the Kafka DStream skipping the WAL implementation - this is
>>>>>>> something I discussed with TD a few weeks ago. Though it is a good
>>>> idea to
>>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>>>> optimization. For
>>>>>>> that reason, we must be careful in implementation. There are a
>>>>>>> couple
>>>> of
>>>>>>> issues that we need to ensure works properly - specifically
>>> ordering.
>>>> To
>>>>>>> ensure we pull messages from different topics and partitions in
>>>>>>> the
>>>> same
>>>>>>> order after failure, we’d still have to persist the metadata to
>>>>>>> HDFS
>>>> (or
>>>>>>> some other system) - this metadata must contain the order of
>>>>>>> messages consumed, so we know how to re-read the messages. I am
>>>>>>> planning to
>>>> explore
>>>>>>> this once I have some time (probably in Jan). In addition, we must
>>>>>>> also ensure bucketing functions work fine as well. I will file a
>>>>>>> placeholder jira for this one.
>>>>>>> 
>>>>>>> I also wrote an API to write data back to Kafka a while back -
>>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that this
>>>>>>> will get pulled in soon, as this is something I know people want.
>>>>>>> I am open
>>>> to
>>>>>>> feedback on that - anything that I can do to make it better.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Hari
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>>>>>>> <pw...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hey Cody,
>>>>>>>> 
>>>>>>>> Thanks for reaching out with this. The lead on streaming is TD -
>>>>>>>> he is traveling this week though so I can respond a bit. To the
>>>>>>>> high level point of whether Kafka is important - it definitely
>>>>>>>> is. Something like 80% of Spark Streaming deployments
>>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>>>>>>>> Kafka is something we generally want in Spark and not a library.
>>>>>>>> In some cases IIRC there were user libraries that used unstable
>>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>>>>>>>> them to merge things upstream. Otherwise users wouldn't be able
>>>>>>>> to use newer Kakfa versions. This is a high level impression only
>>>>>>>> though, I haven't talked to TD about this recently so it's worth
>>> revisiting given the developments in Kafka.
>>>>>>>> 
>>>>>>>> Please do bring things up like this on the dev list if there are
>>>>>>>> blockers for your usage - thanks for pinging it.
>>>>>>>> 
>>>>>>>> - Patrick
>>>>>>>> 
>>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>>>>>>>> <co...@koeninger.org>
>>>>>>>> wrote:
>>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>>>>>>>>> some long-standing Kafka related issues resolved?
>>>>>>>>> 
>>>>>>>>> The existing api is not sufficiently safe nor flexible for our
>>>>>>>> production
>>>>>>>>> use. I don't think we're alone in this viewpoint, because I've
>>>>>>>>> seen several different patches and libraries to fix the same
>>>>>>>>> things we've
>>>>>>>> been
>>>>>>>>> running into.
>>>>>>>>> 
>>>>>>>>> Regarding flexibility
>>>>>>>>> 
>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>>>>>>>> 
>>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>>>>>>>>> this is absolutely necessary. We wrote a similar patch
>>>>>>>>> ourselves, then found
>>>>>>>> that
>>>>>>>>> PR and have been running it in production. We wouldn't be able
>>>>>>>>> to
>>>> get
>>>>>>>> our
>>>>>>>>> jobs done without it. It also allows users to solve a whole
>>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>>>>>>>>> delay of
>>>>>>>> messages, etc).
>>>>>>>>> 
>>>>>>>>> Regarding safety, I understand the motivation behind
>>>>>>>>> WriteAheadLog
>>>> as
>>>>>>>> a
>>>>>>>>> general solution for streaming unreliable sources, but Kafka
>>>>>>>>> already
>>>>>>>> is a
>>>>>>>>> reliable source. I think there's a need for an api that treats
>>>>>>>>> it as such. Even aside from the performance issues of
>>>>>>>>> duplicating the write-ahead log in kafka into another
>>>>>>>>> write-ahead log in hdfs, I
>>>> need
>>>>>>>>> exactly-once semantics in the face of failure (I've had
>>>>>>>>> failures
>>>> that
>>>>>>>>> prevented reloading a spark streaming checkpoint, for instance).
>>>>>>>>> 
>>>>>>>>> I've got an implementation i've been using
>>>>>>>>> 
>>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>>>>>>>> 
>>>>>>>>> Tresata has something similar at
>>>>>>>> https://github.com/tresata/spark-kafka,
>>>>>>>>> and I know there were earlier attempts based on Storm code.
>>>>>>>>> 
>>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
>>>>>>>>> than
>>>>>>>> patches
>>>>>>>>> to Spark is problematic, because large portions of the
>>>> implementation
>>>>>>>> are
>>>>>>>>> private[spark].
>>>>>>>>> 
>>>>>>>>> I'd like to help, but i need to know whose attention to get.
>>>>>>>> 
>>>>>>>> -----------------------------------------------------------------
>>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
>>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: Which committers care about Kafka?

Posted by Hari Shreedharan <hs...@cloudera.com>.
Hi Dibyendu,

Thanks for the details on the implementation. But I still do not believe
that it is no duplicates - what they achieve is that the same batch is
processed exactly the same way every time (but see it may be processed more
than once) - so it depends on the operation being idempotent. I believe
Trident uses ZK to keep track of the transactions - a batch can be
processed multiple times in failure scenarios (for example, the transaction
is processed but before ZK is updated the machine fails, causing a "new"
node to process it again).

I don't think it is impossible to do this in Spark Streaming as well and
I'd be really interested in working on it at some point in the near future.

On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Hi,
>
> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
> Trident has done the exact-once guarantee by processing the tuple in a
> batch  and assigning same transaction-id for a given batch . The replay for
> a given batch with a transaction-id will have exact same set of tuples and
> replay of batches happen in exact same order before the failure.
>
> Having this paradigm, if downstream system process data for a given batch
> for having a given transaction-id , and if during failure if same batch is
> again emitted , you can check if same transaction-id is already processed
> or not and hence can guarantee exact once semantics.
>
> And this can only be achieved in Spark if we use Low Level Kafka consumer
> API to process the offsets. This low level Kafka Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
> related logic has been taken from Storm-Kafka spout and which manages all
> Kafka re-balance and fault tolerant aspects and Kafka metadata managements.
>
> Presently this Consumer maintains that during Receiver failure, it will
> re-emit the exact same Block with same set of messages . Every message have
> the details of its partition, offset and topic related details which can
> tackle the SPARK-3146.
>
> As this Low Level consumer has complete control over the Kafka Offsets ,
> we can implement Trident like feature on top of it like having implement a
> transaction-id for a given block , and re-emit the same block with same set
> of message during Driver failure.
>
> Regards,
> Dibyendu
>
>
> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com>
> wrote:
>>
>> Hi all,
>>
>> I agree with Hari that Strong exact-once semantics is very hard to
>> guarantee, especially in the failure situation. From my understanding even
>> current implementation of ReliableKafkaReceiver cannot fully guarantee the
>> exact once semantics once failed, first is the ordering of data replaying
>> from last checkpoint, this is hard to guarantee when multiple partitions
>> are injected in; second is the design complexity of achieving this, you can
>> refer to the Kafka Spout in Trident, we have to dig into the very details
>> of Kafka metadata management system to achieve this, not to say rebalance
>> and fault-tolerance.
>>
>> Thanks
>> Jerry
>>
>> -----Original Message-----
>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>> Sent: Friday, December 19, 2014 5:57 AM
>> To: Cody Koeninger
>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> Subject: Re: Which committers care about Kafka?
>>
>> But idempotency is not that easy t achieve sometimes. A strong only once
>> semantic through a proper API would  be superuseful; but I'm not implying
>> this is easy to achieve.
>> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
>>
>> > If the downstream store for the output data is idempotent or
>> > transactional, and that downstream store also is the system of record
>> > for kafka offsets, then you have exactly-once semantics.  Commit
>> > offsets with / after the data is stored.  On any failure, restart from
>> the last committed offsets.
>> >
>> > Yes, this approach is biased towards the etl-like use cases rather
>> > than near-realtime-analytics use cases.
>> >
>> > On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> > hshreedharan@cloudera.com
>> > > wrote:
>> > >
>> > > I get what you are saying. But getting exactly once right is an
>> > > extremely hard problem - especially in presence of failure. The
>> > > issue is failures
>> > can
>> > > happen in a bunch of places. For example, before the notification of
>> > > downstream store being successful reaches the receiver that updates
>> > > the offsets, the node fails. The store was successful, but
>> > > duplicates came in either way. This is something worth discussing by
>> > > itself - but without uuids etc this might not really be solved even
>> when you think it is.
>> > >
>> > > Anyway, I will look at the links. Even I am interested in all of the
>> > > features you mentioned - no HDFS WAL for Kafka and once-only
>> > > delivery,
>> > but
>> > > I doubt the latter is really possible to guarantee - though I really
>> > would
>> > > love to have that!
>> > >
>> > > Thanks,
>> > > Hari
>> > >
>> > >
>> > > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> > > <co...@koeninger.org>
>> > > wrote:
>> > >
>> > >> Thanks for the replies.
>> > >>
>> > >> Regarding skipping WAL, it's not just about optimization.  If you
>> > >> actually want exactly-once semantics, you need control of kafka
>> > >> offsets
>> > as
>> > >> well, including the ability to not use zookeeper as the system of
>> > >> record for offsets.  Kafka already is a reliable system that has
>> > >> strong
>> > ordering
>> > >> guarantees (within a partition) and does not mandate the use of
>> > zookeeper
>> > >> to store offsets.  I think there should be a spark api that acts as
>> > >> a
>> > very
>> > >> simple intermediary between Kafka and the user's choice of
>> > >> downstream
>> > store.
>> > >>
>> > >> Take a look at the links I posted - if there's already been 2
>> > independent
>> > >> implementations of the idea, chances are it's something people need.
>> > >>
>> > >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> > >> hshreedharan@cloudera.com> wrote:
>> > >>>
>> > >>> Hi Cody,
>> > >>>
>> > >>> I am an absolute +1 on SPARK-3146. I think we can implement
>> > >>> something pretty simple and lightweight for that one.
>> > >>>
>> > >>> For the Kafka DStream skipping the WAL implementation - this is
>> > >>> something I discussed with TD a few weeks ago. Though it is a good
>> > idea to
>> > >>> implement this to avoid unnecessary HDFS writes, it is an
>> > optimization. For
>> > >>> that reason, we must be careful in implementation. There are a
>> > >>> couple
>> > of
>> > >>> issues that we need to ensure works properly - specifically
>> ordering.
>> > To
>> > >>> ensure we pull messages from different topics and partitions in
>> > >>> the
>> > same
>> > >>> order after failure, we’d still have to persist the metadata to
>> > >>> HDFS
>> > (or
>> > >>> some other system) - this metadata must contain the order of
>> > >>> messages consumed, so we know how to re-read the messages. I am
>> > >>> planning to
>> > explore
>> > >>> this once I have some time (probably in Jan). In addition, we must
>> > >>> also ensure bucketing functions work fine as well. I will file a
>> > >>> placeholder jira for this one.
>> > >>>
>> > >>> I also wrote an API to write data back to Kafka a while back -
>> > >>> https://github.com/apache/spark/pull/2994 . I am hoping that this
>> > >>> will get pulled in soon, as this is something I know people want.
>> > >>> I am open
>> > to
>> > >>> feedback on that - anything that I can do to make it better.
>> > >>>
>> > >>> Thanks,
>> > >>> Hari
>> > >>>
>> > >>>
>> > >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> > >>> <pw...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>>  Hey Cody,
>> > >>>>
>> > >>>> Thanks for reaching out with this. The lead on streaming is TD -
>> > >>>> he is traveling this week though so I can respond a bit. To the
>> > >>>> high level point of whether Kafka is important - it definitely
>> > >>>> is. Something like 80% of Spark Streaming deployments
>> > >>>> (anecdotally) ingest data from Kafka. Also, good support for
>> > >>>> Kafka is something we generally want in Spark and not a library.
>> > >>>> In some cases IIRC there were user libraries that used unstable
>> > >>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>> > >>>> them to merge things upstream. Otherwise users wouldn't be able
>> > >>>> to use newer Kakfa versions. This is a high level impression only
>> > >>>> though, I haven't talked to TD about this recently so it's worth
>> revisiting given the developments in Kafka.
>> > >>>>
>> > >>>> Please do bring things up like this on the dev list if there are
>> > >>>> blockers for your usage - thanks for pinging it.
>> > >>>>
>> > >>>> - Patrick
>> > >>>>
>> > >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> > >>>> <co...@koeninger.org>
>> > >>>> wrote:
>> > >>>> > Now that 1.2 is finalized... who are the go-to people to get
>> > >>>> > some long-standing Kafka related issues resolved?
>> > >>>> >
>> > >>>> > The existing api is not sufficiently safe nor flexible for our
>> > >>>> production
>> > >>>> > use. I don't think we're alone in this viewpoint, because I've
>> > >>>> > seen several different patches and libraries to fix the same
>> > >>>> > things we've
>> > >>>> been
>> > >>>> > running into.
>> > >>>> >
>> > >>>> > Regarding flexibility
>> > >>>> >
>> > >>>> > https://issues.apache.org/jira/browse/SPARK-3146
>> > >>>> >
>> > >>>> > has been outstanding since August, and IMHO an equivalent of
>> > >>>> > this is absolutely necessary. We wrote a similar patch
>> > >>>> > ourselves, then found
>> > >>>> that
>> > >>>> > PR and have been running it in production. We wouldn't be able
>> > >>>> > to
>> > get
>> > >>>> our
>> > >>>> > jobs done without it. It also allows users to solve a whole
>> > >>>> > class of problems for themselves (e.g. SPARK-2388, arbitrary
>> > >>>> > delay of
>> > >>>> messages, etc).
>> > >>>> >
>> > >>>> > Regarding safety, I understand the motivation behind
>> > >>>> > WriteAheadLog
>> > as
>> > >>>> a
>> > >>>> > general solution for streaming unreliable sources, but Kafka
>> > >>>> > already
>> > >>>> is a
>> > >>>> > reliable source. I think there's a need for an api that treats
>> > >>>> > it as such. Even aside from the performance issues of
>> > >>>> > duplicating the write-ahead log in kafka into another
>> > >>>> > write-ahead log in hdfs, I
>> > need
>> > >>>> > exactly-once semantics in the face of failure (I've had
>> > >>>> > failures
>> > that
>> > >>>> > prevented reloading a spark streaming checkpoint, for instance).
>> > >>>> >
>> > >>>> > I've got an implementation i've been using
>> > >>>> >
>> > >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> > >>>> > ka /src/main/scala/org/apache/spark/rdd/kafka
>> > >>>> >
>> > >>>> > Tresata has something similar at
>> > >>>> https://github.com/tresata/spark-kafka,
>> > >>>> > and I know there were earlier attempts based on Storm code.
>> > >>>> >
>> > >>>> > Trying to distribute these kinds of fixes as libraries rather
>> > >>>> > than
>> > >>>> patches
>> > >>>> > to Spark is problematic, because large portions of the
>> > implementation
>> > >>>> are
>> > >>>> > private[spark].
>> > >>>> >
>> > >>>> > I'd like to help, but i need to know whose attention to get.
>> > >>>>
>> > >>>> -----------------------------------------------------------------
>> > >>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
>> > >>>> additional commands, e-mail: dev-help@spark.apache.org
>> > >>>>
>> > >>>>
>> > >>>
>> > >
>> >
>>
>

Re: Which committers care about Kafka?

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi,

Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
Trident has done the exact-once guarantee by processing the tuple in a
batch  and assigning same transaction-id for a given batch . The replay for
a given batch with a transaction-id will have exact same set of tuples and
replay of batches happen in exact same order before the failure.

Having this paradigm, if downstream system process data for a given batch
for having a given transaction-id , and if during failure if same batch is
again emitted , you can check if same transaction-id is already processed
or not and hence can guarantee exact once semantics.

And this can only be achieved in Spark if we use Low Level Kafka consumer
API to process the offsets. This low level Kafka Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) has implemented the Spark
Kafka consumer which uses Kafka Low Level APIs . All of the Kafka related
logic has been taken from Storm-Kafka spout and which manages all Kafka
re-balance and fault tolerant aspects and Kafka metadata managements.

Presently this Consumer maintains that during Receiver failure, it will
re-emit the exact same Block with same set of messages . Every message have
the details of its partition, offset and topic related details which can
tackle the SPARK-3146.

As this Low Level consumer has complete control over the Kafka Offsets , we
can implement Trident like feature on top of it like having implement a
transaction-id for a given block , and re-emit the same block with same set
of message during Driver failure.

Regards,
Dibyendu


On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <sa...@intel.com> wrote:
>
> Hi all,
>
> I agree with Hari that Strong exact-once semantics is very hard to
> guarantee, especially in the failure situation. From my understanding even
> current implementation of ReliableKafkaReceiver cannot fully guarantee the
> exact once semantics once failed, first is the ordering of data replaying
> from last checkpoint, this is hard to guarantee when multiple partitions
> are injected in; second is the design complexity of achieving this, you can
> refer to the Kafka Spout in Trident, we have to dig into the very details
> of Kafka metadata management system to achieve this, not to say rebalance
> and fault-tolerance.
>
> Thanks
> Jerry
>
> -----Original Message-----
> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
> Sent: Friday, December 19, 2014 5:57 AM
> To: Cody Koeninger
> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> Subject: Re: Which committers care about Kafka?
>
> But idempotency is not that easy t achieve sometimes. A strong only once
> semantic through a proper API would  be superuseful; but I'm not implying
> this is easy to achieve.
> On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:
>
> > If the downstream store for the output data is idempotent or
> > transactional, and that downstream store also is the system of record
> > for kafka offsets, then you have exactly-once semantics.  Commit
> > offsets with / after the data is stored.  On any failure, restart from
> the last committed offsets.
> >
> > Yes, this approach is biased towards the etl-like use cases rather
> > than near-realtime-analytics use cases.
> >
> > On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> > hshreedharan@cloudera.com
> > > wrote:
> > >
> > > I get what you are saying. But getting exactly once right is an
> > > extremely hard problem - especially in presence of failure. The
> > > issue is failures
> > can
> > > happen in a bunch of places. For example, before the notification of
> > > downstream store being successful reaches the receiver that updates
> > > the offsets, the node fails. The store was successful, but
> > > duplicates came in either way. This is something worth discussing by
> > > itself - but without uuids etc this might not really be solved even
> when you think it is.
> > >
> > > Anyway, I will look at the links. Even I am interested in all of the
> > > features you mentioned - no HDFS WAL for Kafka and once-only
> > > delivery,
> > but
> > > I doubt the latter is really possible to guarantee - though I really
> > would
> > > love to have that!
> > >
> > > Thanks,
> > > Hari
> > >
> > >
> > > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> > > <co...@koeninger.org>
> > > wrote:
> > >
> > >> Thanks for the replies.
> > >>
> > >> Regarding skipping WAL, it's not just about optimization.  If you
> > >> actually want exactly-once semantics, you need control of kafka
> > >> offsets
> > as
> > >> well, including the ability to not use zookeeper as the system of
> > >> record for offsets.  Kafka already is a reliable system that has
> > >> strong
> > ordering
> > >> guarantees (within a partition) and does not mandate the use of
> > zookeeper
> > >> to store offsets.  I think there should be a spark api that acts as
> > >> a
> > very
> > >> simple intermediary between Kafka and the user's choice of
> > >> downstream
> > store.
> > >>
> > >> Take a look at the links I posted - if there's already been 2
> > independent
> > >> implementations of the idea, chances are it's something people need.
> > >>
> > >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> > >> hshreedharan@cloudera.com> wrote:
> > >>>
> > >>> Hi Cody,
> > >>>
> > >>> I am an absolute +1 on SPARK-3146. I think we can implement
> > >>> something pretty simple and lightweight for that one.
> > >>>
> > >>> For the Kafka DStream skipping the WAL implementation - this is
> > >>> something I discussed with TD a few weeks ago. Though it is a good
> > idea to
> > >>> implement this to avoid unnecessary HDFS writes, it is an
> > optimization. For
> > >>> that reason, we must be careful in implementation. There are a
> > >>> couple
> > of
> > >>> issues that we need to ensure works properly - specifically ordering.
> > To
> > >>> ensure we pull messages from different topics and partitions in
> > >>> the
> > same
> > >>> order after failure, we’d still have to persist the metadata to
> > >>> HDFS
> > (or
> > >>> some other system) - this metadata must contain the order of
> > >>> messages consumed, so we know how to re-read the messages. I am
> > >>> planning to
> > explore
> > >>> this once I have some time (probably in Jan). In addition, we must
> > >>> also ensure bucketing functions work fine as well. I will file a
> > >>> placeholder jira for this one.
> > >>>
> > >>> I also wrote an API to write data back to Kafka a while back -
> > >>> https://github.com/apache/spark/pull/2994 . I am hoping that this
> > >>> will get pulled in soon, as this is something I know people want.
> > >>> I am open
> > to
> > >>> feedback on that - anything that I can do to make it better.
> > >>>
> > >>> Thanks,
> > >>> Hari
> > >>>
> > >>>
> > >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> > >>> <pw...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>>  Hey Cody,
> > >>>>
> > >>>> Thanks for reaching out with this. The lead on streaming is TD -
> > >>>> he is traveling this week though so I can respond a bit. To the
> > >>>> high level point of whether Kafka is important - it definitely
> > >>>> is. Something like 80% of Spark Streaming deployments
> > >>>> (anecdotally) ingest data from Kafka. Also, good support for
> > >>>> Kafka is something we generally want in Spark and not a library.
> > >>>> In some cases IIRC there were user libraries that used unstable
> > >>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
> > >>>> them to merge things upstream. Otherwise users wouldn't be able
> > >>>> to use newer Kakfa versions. This is a high level impression only
> > >>>> though, I haven't talked to TD about this recently so it's worth
> revisiting given the developments in Kafka.
> > >>>>
> > >>>> Please do bring things up like this on the dev list if there are
> > >>>> blockers for your usage - thanks for pinging it.
> > >>>>
> > >>>> - Patrick
> > >>>>
> > >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> > >>>> <co...@koeninger.org>
> > >>>> wrote:
> > >>>> > Now that 1.2 is finalized... who are the go-to people to get
> > >>>> > some long-standing Kafka related issues resolved?
> > >>>> >
> > >>>> > The existing api is not sufficiently safe nor flexible for our
> > >>>> production
> > >>>> > use. I don't think we're alone in this viewpoint, because I've
> > >>>> > seen several different patches and libraries to fix the same
> > >>>> > things we've
> > >>>> been
> > >>>> > running into.
> > >>>> >
> > >>>> > Regarding flexibility
> > >>>> >
> > >>>> > https://issues.apache.org/jira/browse/SPARK-3146
> > >>>> >
> > >>>> > has been outstanding since August, and IMHO an equivalent of
> > >>>> > this is absolutely necessary. We wrote a similar patch
> > >>>> > ourselves, then found
> > >>>> that
> > >>>> > PR and have been running it in production. We wouldn't be able
> > >>>> > to
> > get
> > >>>> our
> > >>>> > jobs done without it. It also allows users to solve a whole
> > >>>> > class of problems for themselves (e.g. SPARK-2388, arbitrary
> > >>>> > delay of
> > >>>> messages, etc).
> > >>>> >
> > >>>> > Regarding safety, I understand the motivation behind
> > >>>> > WriteAheadLog
> > as
> > >>>> a
> > >>>> > general solution for streaming unreliable sources, but Kafka
> > >>>> > already
> > >>>> is a
> > >>>> > reliable source. I think there's a need for an api that treats
> > >>>> > it as such. Even aside from the performance issues of
> > >>>> > duplicating the write-ahead log in kafka into another
> > >>>> > write-ahead log in hdfs, I
> > need
> > >>>> > exactly-once semantics in the face of failure (I've had
> > >>>> > failures
> > that
> > >>>> > prevented reloading a spark streaming checkpoint, for instance).
> > >>>> >
> > >>>> > I've got an implementation i've been using
> > >>>> >
> > >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> > >>>> > ka /src/main/scala/org/apache/spark/rdd/kafka
> > >>>> >
> > >>>> > Tresata has something similar at
> > >>>> https://github.com/tresata/spark-kafka,
> > >>>> > and I know there were earlier attempts based on Storm code.
> > >>>> >
> > >>>> > Trying to distribute these kinds of fixes as libraries rather
> > >>>> > than
> > >>>> patches
> > >>>> > to Spark is problematic, because large portions of the
> > implementation
> > >>>> are
> > >>>> > private[spark].
> > >>>> >
> > >>>> > I'd like to help, but i need to know whose attention to get.
> > >>>>
> > >>>> -----------------------------------------------------------------
> > >>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
> > >>>> additional commands, e-mail: dev-help@spark.apache.org
> > >>>>
> > >>>>
> > >>>
> > >
> >
>

RE: Which committers care about Kafka?

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi all,

I agree with Hari that Strong exact-once semantics is very hard to guarantee, especially in the failure situation. From my understanding even current implementation of ReliableKafkaReceiver cannot fully guarantee the exact once semantics once failed, first is the ordering of data replaying from last checkpoint, this is hard to guarantee when multiple partitions are injected in; second is the design complexity of achieving this, you can refer to the Kafka Spout in Trident, we have to dig into the very details of Kafka metadata management system to achieve this, not to say rebalance and fault-tolerance. 

Thanks
Jerry

-----Original Message-----
From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com] 
Sent: Friday, December 19, 2014 5:57 AM
To: Cody Koeninger
Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
Subject: Re: Which committers care about Kafka?

But idempotency is not that easy t achieve sometimes. A strong only once semantic through a proper API would  be superuseful; but I'm not implying this is easy to achieve.
On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:

> If the downstream store for the output data is idempotent or 
> transactional, and that downstream store also is the system of record 
> for kafka offsets, then you have exactly-once semantics.  Commit 
> offsets with / after the data is stored.  On any failure, restart from the last committed offsets.
>
> Yes, this approach is biased towards the etl-like use cases rather 
> than near-realtime-analytics use cases.
>
> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < 
> hshreedharan@cloudera.com
> > wrote:
> >
> > I get what you are saying. But getting exactly once right is an 
> > extremely hard problem - especially in presence of failure. The 
> > issue is failures
> can
> > happen in a bunch of places. For example, before the notification of 
> > downstream store being successful reaches the receiver that updates 
> > the offsets, the node fails. The store was successful, but 
> > duplicates came in either way. This is something worth discussing by 
> > itself - but without uuids etc this might not really be solved even when you think it is.
> >
> > Anyway, I will look at the links. Even I am interested in all of the 
> > features you mentioned - no HDFS WAL for Kafka and once-only 
> > delivery,
> but
> > I doubt the latter is really possible to guarantee - though I really
> would
> > love to have that!
> >
> > Thanks,
> > Hari
> >
> >
> > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger 
> > <co...@koeninger.org>
> > wrote:
> >
> >> Thanks for the replies.
> >>
> >> Regarding skipping WAL, it's not just about optimization.  If you 
> >> actually want exactly-once semantics, you need control of kafka 
> >> offsets
> as
> >> well, including the ability to not use zookeeper as the system of 
> >> record for offsets.  Kafka already is a reliable system that has 
> >> strong
> ordering
> >> guarantees (within a partition) and does not mandate the use of
> zookeeper
> >> to store offsets.  I think there should be a spark api that acts as 
> >> a
> very
> >> simple intermediary between Kafka and the user's choice of 
> >> downstream
> store.
> >>
> >> Take a look at the links I posted - if there's already been 2
> independent
> >> implementations of the idea, chances are it's something people need.
> >>
> >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < 
> >> hshreedharan@cloudera.com> wrote:
> >>>
> >>> Hi Cody,
> >>>
> >>> I am an absolute +1 on SPARK-3146. I think we can implement 
> >>> something pretty simple and lightweight for that one.
> >>>
> >>> For the Kafka DStream skipping the WAL implementation - this is 
> >>> something I discussed with TD a few weeks ago. Though it is a good
> idea to
> >>> implement this to avoid unnecessary HDFS writes, it is an
> optimization. For
> >>> that reason, we must be careful in implementation. There are a 
> >>> couple
> of
> >>> issues that we need to ensure works properly - specifically ordering.
> To
> >>> ensure we pull messages from different topics and partitions in 
> >>> the
> same
> >>> order after failure, we’d still have to persist the metadata to 
> >>> HDFS
> (or
> >>> some other system) - this metadata must contain the order of 
> >>> messages consumed, so we know how to re-read the messages. I am 
> >>> planning to
> explore
> >>> this once I have some time (probably in Jan). In addition, we must 
> >>> also ensure bucketing functions work fine as well. I will file a 
> >>> placeholder jira for this one.
> >>>
> >>> I also wrote an API to write data back to Kafka a while back -
> >>> https://github.com/apache/spark/pull/2994 . I am hoping that this 
> >>> will get pulled in soon, as this is something I know people want. 
> >>> I am open
> to
> >>> feedback on that - anything that I can do to make it better.
> >>>
> >>> Thanks,
> >>> Hari
> >>>
> >>>
> >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell 
> >>> <pw...@gmail.com>
> >>> wrote:
> >>>
> >>>>  Hey Cody,
> >>>>
> >>>> Thanks for reaching out with this. The lead on streaming is TD - 
> >>>> he is traveling this week though so I can respond a bit. To the 
> >>>> high level point of whether Kafka is important - it definitely 
> >>>> is. Something like 80% of Spark Streaming deployments 
> >>>> (anecdotally) ingest data from Kafka. Also, good support for 
> >>>> Kafka is something we generally want in Spark and not a library. 
> >>>> In some cases IIRC there were user libraries that used unstable 
> >>>> Kafka API's and we were somewhat waiting on Kafka to stabilize 
> >>>> them to merge things upstream. Otherwise users wouldn't be able 
> >>>> to use newer Kakfa versions. This is a high level impression only 
> >>>> though, I haven't talked to TD about this recently so it's worth revisiting given the developments in Kafka.
> >>>>
> >>>> Please do bring things up like this on the dev list if there are 
> >>>> blockers for your usage - thanks for pinging it.
> >>>>
> >>>> - Patrick
> >>>>
> >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger 
> >>>> <co...@koeninger.org>
> >>>> wrote:
> >>>> > Now that 1.2 is finalized... who are the go-to people to get 
> >>>> > some long-standing Kafka related issues resolved?
> >>>> >
> >>>> > The existing api is not sufficiently safe nor flexible for our
> >>>> production
> >>>> > use. I don't think we're alone in this viewpoint, because I've 
> >>>> > seen several different patches and libraries to fix the same 
> >>>> > things we've
> >>>> been
> >>>> > running into.
> >>>> >
> >>>> > Regarding flexibility
> >>>> >
> >>>> > https://issues.apache.org/jira/browse/SPARK-3146
> >>>> >
> >>>> > has been outstanding since August, and IMHO an equivalent of 
> >>>> > this is absolutely necessary. We wrote a similar patch 
> >>>> > ourselves, then found
> >>>> that
> >>>> > PR and have been running it in production. We wouldn't be able 
> >>>> > to
> get
> >>>> our
> >>>> > jobs done without it. It also allows users to solve a whole 
> >>>> > class of problems for themselves (e.g. SPARK-2388, arbitrary 
> >>>> > delay of
> >>>> messages, etc).
> >>>> >
> >>>> > Regarding safety, I understand the motivation behind 
> >>>> > WriteAheadLog
> as
> >>>> a
> >>>> > general solution for streaming unreliable sources, but Kafka 
> >>>> > already
> >>>> is a
> >>>> > reliable source. I think there's a need for an api that treats 
> >>>> > it as such. Even aside from the performance issues of 
> >>>> > duplicating the write-ahead log in kafka into another 
> >>>> > write-ahead log in hdfs, I
> need
> >>>> > exactly-once semantics in the face of failure (I've had 
> >>>> > failures
> that
> >>>> > prevented reloading a spark streaming checkpoint, for instance).
> >>>> >
> >>>> > I've got an implementation i've been using
> >>>> >
> >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>>> > ka /src/main/scala/org/apache/spark/rdd/kafka
> >>>> >
> >>>> > Tresata has something similar at
> >>>> https://github.com/tresata/spark-kafka,
> >>>> > and I know there were earlier attempts based on Storm code.
> >>>> >
> >>>> > Trying to distribute these kinds of fixes as libraries rather 
> >>>> > than
> >>>> patches
> >>>> > to Spark is problematic, because large portions of the
> implementation
> >>>> are
> >>>> > private[spark].
> >>>> >
> >>>> > I'd like to help, but i need to know whose attention to get.
> >>>>
> >>>> -----------------------------------------------------------------
> >>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For 
> >>>> additional commands, e-mail: dev-help@spark.apache.org
> >>>>
> >>>>
> >>>
> >
>

Re: Which committers care about Kafka?

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
But idempotency is not that easy t achieve sometimes. A strong only once
semantic through a proper API would  be superuseful; but I'm not implying
this is easy to achieve.
On 18 Dec 2014 21:52, "Cody Koeninger" <co...@koeninger.org> wrote:

> If the downstream store for the output data is idempotent or transactional,
> and that downstream store also is the system of record for kafka offsets,
> then you have exactly-once semantics.  Commit offsets with / after the data
> is stored.  On any failure, restart from the last committed offsets.
>
> Yes, this approach is biased towards the etl-like use cases rather than
> near-realtime-analytics use cases.
>
> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> hshreedharan@cloudera.com
> > wrote:
> >
> > I get what you are saying. But getting exactly once right is an extremely
> > hard problem - especially in presence of failure. The issue is failures
> can
> > happen in a bunch of places. For example, before the notification of
> > downstream store being successful reaches the receiver that updates the
> > offsets, the node fails. The store was successful, but duplicates came in
> > either way. This is something worth discussing by itself - but without
> > uuids etc this might not really be solved even when you think it is.
> >
> > Anyway, I will look at the links. Even I am interested in all of the
> > features you mentioned - no HDFS WAL for Kafka and once-only delivery,
> but
> > I doubt the latter is really possible to guarantee - though I really
> would
> > love to have that!
> >
> > Thanks,
> > Hari
> >
> >
> > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <co...@koeninger.org>
> > wrote:
> >
> >> Thanks for the replies.
> >>
> >> Regarding skipping WAL, it's not just about optimization.  If you
> >> actually want exactly-once semantics, you need control of kafka offsets
> as
> >> well, including the ability to not use zookeeper as the system of record
> >> for offsets.  Kafka already is a reliable system that has strong
> ordering
> >> guarantees (within a partition) and does not mandate the use of
> zookeeper
> >> to store offsets.  I think there should be a spark api that acts as a
> very
> >> simple intermediary between Kafka and the user's choice of downstream
> store.
> >>
> >> Take a look at the links I posted - if there's already been 2
> independent
> >> implementations of the idea, chances are it's something people need.
> >>
> >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >> hshreedharan@cloudera.com> wrote:
> >>>
> >>> Hi Cody,
> >>>
> >>> I am an absolute +1 on SPARK-3146. I think we can implement something
> >>> pretty simple and lightweight for that one.
> >>>
> >>> For the Kafka DStream skipping the WAL implementation - this is
> >>> something I discussed with TD a few weeks ago. Though it is a good
> idea to
> >>> implement this to avoid unnecessary HDFS writes, it is an
> optimization. For
> >>> that reason, we must be careful in implementation. There are a couple
> of
> >>> issues that we need to ensure works properly - specifically ordering.
> To
> >>> ensure we pull messages from different topics and partitions in the
> same
> >>> order after failure, we’d still have to persist the metadata to HDFS
> (or
> >>> some other system) - this metadata must contain the order of messages
> >>> consumed, so we know how to re-read the messages. I am planning to
> explore
> >>> this once I have some time (probably in Jan). In addition, we must also
> >>> ensure bucketing functions work fine as well. I will file a placeholder
> >>> jira for this one.
> >>>
> >>> I also wrote an API to write data back to Kafka a while back -
> >>> https://github.com/apache/spark/pull/2994 . I am hoping that this will
> >>> get pulled in soon, as this is something I know people want. I am open
> to
> >>> feedback on that - anything that I can do to make it better.
> >>>
> >>> Thanks,
> >>> Hari
> >>>
> >>>
> >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pw...@gmail.com>
> >>> wrote:
> >>>
> >>>>  Hey Cody,
> >>>>
> >>>> Thanks for reaching out with this. The lead on streaming is TD - he is
> >>>> traveling this week though so I can respond a bit. To the high level
> >>>> point of whether Kafka is important - it definitely is. Something like
> >>>> 80% of Spark Streaming deployments (anecdotally) ingest data from
> >>>> Kafka. Also, good support for Kafka is something we generally want in
> >>>> Spark and not a library. In some cases IIRC there were user libraries
> >>>> that used unstable Kafka API's and we were somewhat waiting on Kafka
> >>>> to stabilize them to merge things upstream. Otherwise users wouldn't
> >>>> be able to use newer Kakfa versions. This is a high level impression
> >>>> only though, I haven't talked to TD about this recently so it's worth
> >>>> revisiting given the developments in Kafka.
> >>>>
> >>>> Please do bring things up like this on the dev list if there are
> >>>> blockers for your usage - thanks for pinging it.
> >>>>
> >>>> - Patrick
> >>>>
> >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org>
> >>>> wrote:
> >>>> > Now that 1.2 is finalized... who are the go-to people to get some
> >>>> > long-standing Kafka related issues resolved?
> >>>> >
> >>>> > The existing api is not sufficiently safe nor flexible for our
> >>>> production
> >>>> > use. I don't think we're alone in this viewpoint, because I've seen
> >>>> > several different patches and libraries to fix the same things we've
> >>>> been
> >>>> > running into.
> >>>> >
> >>>> > Regarding flexibility
> >>>> >
> >>>> > https://issues.apache.org/jira/browse/SPARK-3146
> >>>> >
> >>>> > has been outstanding since August, and IMHO an equivalent of this is
> >>>> > absolutely necessary. We wrote a similar patch ourselves, then found
> >>>> that
> >>>> > PR and have been running it in production. We wouldn't be able to
> get
> >>>> our
> >>>> > jobs done without it. It also allows users to solve a whole class of
> >>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of
> >>>> messages, etc).
> >>>> >
> >>>> > Regarding safety, I understand the motivation behind WriteAheadLog
> as
> >>>> a
> >>>> > general solution for streaming unreliable sources, but Kafka already
> >>>> is a
> >>>> > reliable source. I think there's a need for an api that treats it as
> >>>> > such. Even aside from the performance issues of duplicating the
> >>>> > write-ahead log in kafka into another write-ahead log in hdfs, I
> need
> >>>> > exactly-once semantics in the face of failure (I've had failures
> that
> >>>> > prevented reloading a spark streaming checkpoint, for instance).
> >>>> >
> >>>> > I've got an implementation i've been using
> >>>> >
> >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
> >>>> > /src/main/scala/org/apache/spark/rdd/kafka
> >>>> >
> >>>> > Tresata has something similar at
> >>>> https://github.com/tresata/spark-kafka,
> >>>> > and I know there were earlier attempts based on Storm code.
> >>>> >
> >>>> > Trying to distribute these kinds of fixes as libraries rather than
> >>>> patches
> >>>> > to Spark is problematic, because large portions of the
> implementation
> >>>> are
> >>>> > private[spark].
> >>>> >
> >>>> > I'd like to help, but i need to know whose attention to get.
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >>>> For additional commands, e-mail: dev-help@spark.apache.org
> >>>>
> >>>>
> >>>
> >
>

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
If the downstream store for the output data is idempotent or transactional,
and that downstream store also is the system of record for kafka offsets,
then you have exactly-once semantics.  Commit offsets with / after the data
is stored.  On any failure, restart from the last committed offsets.

Yes, this approach is biased towards the etl-like use cases rather than
near-realtime-analytics use cases.

On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:
>
> I get what you are saying. But getting exactly once right is an extremely
> hard problem - especially in presence of failure. The issue is failures can
> happen in a bunch of places. For example, before the notification of
> downstream store being successful reaches the receiver that updates the
> offsets, the node fails. The store was successful, but duplicates came in
> either way. This is something worth discussing by itself - but without
> uuids etc this might not really be solved even when you think it is.
>
> Anyway, I will look at the links. Even I am interested in all of the
> features you mentioned - no HDFS WAL for Kafka and once-only delivery, but
> I doubt the latter is really possible to guarantee - though I really would
> love to have that!
>
> Thanks,
> Hari
>
>
> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Thanks for the replies.
>>
>> Regarding skipping WAL, it's not just about optimization.  If you
>> actually want exactly-once semantics, you need control of kafka offsets as
>> well, including the ability to not use zookeeper as the system of record
>> for offsets.  Kafka already is a reliable system that has strong ordering
>> guarantees (within a partition) and does not mandate the use of zookeeper
>> to store offsets.  I think there should be a spark api that acts as a very
>> simple intermediary between Kafka and the user's choice of downstream store.
>>
>> Take a look at the links I posted - if there's already been 2 independent
>> implementations of the idea, chances are it's something people need.
>>
>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> hshreedharan@cloudera.com> wrote:
>>>
>>> Hi Cody,
>>>
>>> I am an absolute +1 on SPARK-3146. I think we can implement something
>>> pretty simple and lightweight for that one.
>>>
>>> For the Kafka DStream skipping the WAL implementation - this is
>>> something I discussed with TD a few weeks ago. Though it is a good idea to
>>> implement this to avoid unnecessary HDFS writes, it is an optimization. For
>>> that reason, we must be careful in implementation. There are a couple of
>>> issues that we need to ensure works properly - specifically ordering. To
>>> ensure we pull messages from different topics and partitions in the same
>>> order after failure, we’d still have to persist the metadata to HDFS (or
>>> some other system) - this metadata must contain the order of messages
>>> consumed, so we know how to re-read the messages. I am planning to explore
>>> this once I have some time (probably in Jan). In addition, we must also
>>> ensure bucketing functions work fine as well. I will file a placeholder
>>> jira for this one.
>>>
>>> I also wrote an API to write data back to Kafka a while back -
>>> https://github.com/apache/spark/pull/2994 . I am hoping that this will
>>> get pulled in soon, as this is something I know people want. I am open to
>>> feedback on that - anything that I can do to make it better.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pw...@gmail.com>
>>> wrote:
>>>
>>>>  Hey Cody,
>>>>
>>>> Thanks for reaching out with this. The lead on streaming is TD - he is
>>>> traveling this week though so I can respond a bit. To the high level
>>>> point of whether Kafka is important - it definitely is. Something like
>>>> 80% of Spark Streaming deployments (anecdotally) ingest data from
>>>> Kafka. Also, good support for Kafka is something we generally want in
>>>> Spark and not a library. In some cases IIRC there were user libraries
>>>> that used unstable Kafka API's and we were somewhat waiting on Kafka
>>>> to stabilize them to merge things upstream. Otherwise users wouldn't
>>>> be able to use newer Kakfa versions. This is a high level impression
>>>> only though, I haven't talked to TD about this recently so it's worth
>>>> revisiting given the developments in Kafka.
>>>>
>>>> Please do bring things up like this on the dev list if there are
>>>> blockers for your usage - thanks for pinging it.
>>>>
>>>> - Patrick
>>>>
>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>> > Now that 1.2 is finalized... who are the go-to people to get some
>>>> > long-standing Kafka related issues resolved?
>>>> >
>>>> > The existing api is not sufficiently safe nor flexible for our
>>>> production
>>>> > use. I don't think we're alone in this viewpoint, because I've seen
>>>> > several different patches and libraries to fix the same things we've
>>>> been
>>>> > running into.
>>>> >
>>>> > Regarding flexibility
>>>> >
>>>> > https://issues.apache.org/jira/browse/SPARK-3146
>>>> >
>>>> > has been outstanding since August, and IMHO an equivalent of this is
>>>> > absolutely necessary. We wrote a similar patch ourselves, then found
>>>> that
>>>> > PR and have been running it in production. We wouldn't be able to get
>>>> our
>>>> > jobs done without it. It also allows users to solve a whole class of
>>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of
>>>> messages, etc).
>>>> >
>>>> > Regarding safety, I understand the motivation behind WriteAheadLog as
>>>> a
>>>> > general solution for streaming unreliable sources, but Kafka already
>>>> is a
>>>> > reliable source. I think there's a need for an api that treats it as
>>>> > such. Even aside from the performance issues of duplicating the
>>>> > write-ahead log in kafka into another write-ahead log in hdfs, I need
>>>> > exactly-once semantics in the face of failure (I've had failures that
>>>> > prevented reloading a spark streaming checkpoint, for instance).
>>>> >
>>>> > I've got an implementation i've been using
>>>> >
>>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
>>>> > /src/main/scala/org/apache/spark/rdd/kafka
>>>> >
>>>> > Tresata has something similar at
>>>> https://github.com/tresata/spark-kafka,
>>>> > and I know there were earlier attempts based on Storm code.
>>>> >
>>>> > Trying to distribute these kinds of fixes as libraries rather than
>>>> patches
>>>> > to Spark is problematic, because large portions of the implementation
>>>> are
>>>> > private[spark].
>>>> >
>>>> > I'd like to help, but i need to know whose attention to get.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>>
>>>>
>>>
>

Re: Which committers care about Kafka?

Posted by Hari Shreedharan <hs...@cloudera.com>.
I get what you are saying. But getting exactly once right is an extremely hard problem - especially in presence of failure. The issue is failures can happen in a bunch of places. For example, before the notification of downstream store being successful reaches the receiver that updates the offsets, the node fails. The store was successful, but duplicates came in either way. This is something worth discussing by itself - but without uuids etc this might not really be solved even when you think it is.




Anyway, I will look at the links. Even I am interested in all of the features you mentioned - no HDFS WAL for Kafka and once-only delivery, but I doubt the latter is really possible to guarantee - though I really would love to have that!




Thanks, Hari

On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <co...@koeninger.org>
wrote:

> Thanks for the replies.
> Regarding skipping WAL, it's not just about optimization.  If you actually
> want exactly-once semantics, you need control of kafka offsets as well,
> including the ability to not use zookeeper as the system of record for
> offsets.  Kafka already is a reliable system that has strong ordering
> guarantees (within a partition) and does not mandate the use of zookeeper
> to store offsets.  I think there should be a spark api that acts as a very
> simple intermediary between Kafka and the user's choice of downstream store.
> Take a look at the links I posted - if there's already been 2 independent
> implementations of the idea, chances are it's something people need.
> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <hshreedharan@cloudera.com
>> wrote:
>>
>> Hi Cody,
>>
>> I am an absolute +1 on SPARK-3146. I think we can implement something
>> pretty simple and lightweight for that one.
>>
>> For the Kafka DStream skipping the WAL implementation - this is something
>> I discussed with TD a few weeks ago. Though it is a good idea to implement
>> this to avoid unnecessary HDFS writes, it is an optimization. For that
>> reason, we must be careful in implementation. There are a couple of issues
>> that we need to ensure works properly - specifically ordering. To ensure we
>> pull messages from different topics and partitions in the same order after
>> failure, we’d still have to persist the metadata to HDFS (or some other
>> system) - this metadata must contain the order of messages consumed, so we
>> know how to re-read the messages. I am planning to explore this once I have
>> some time (probably in Jan). In addition, we must also ensure bucketing
>> functions work fine as well. I will file a placeholder jira for this one.
>>
>> I also wrote an API to write data back to Kafka a while back -
>> https://github.com/apache/spark/pull/2994 . I am hoping that this will
>> get pulled in soon, as this is something I know people want. I am open to
>> feedback on that - anything that I can do to make it better.
>>
>> Thanks,
>> Hari
>>
>>
>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pw...@gmail.com>
>> wrote:
>>
>>> Hey Cody,
>>>
>>> Thanks for reaching out with this. The lead on streaming is TD - he is
>>> traveling this week though so I can respond a bit. To the high level
>>> point of whether Kafka is important - it definitely is. Something like
>>> 80% of Spark Streaming deployments (anecdotally) ingest data from
>>> Kafka. Also, good support for Kafka is something we generally want in
>>> Spark and not a library. In some cases IIRC there were user libraries
>>> that used unstable Kafka API's and we were somewhat waiting on Kafka
>>> to stabilize them to merge things upstream. Otherwise users wouldn't
>>> be able to use newer Kakfa versions. This is a high level impression
>>> only though, I haven't talked to TD about this recently so it's worth
>>> revisiting given the developments in Kafka.
>>>
>>> Please do bring things up like this on the dev list if there are
>>> blockers for your usage - thanks for pinging it.
>>>
>>> - Patrick
>>>
>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>> > Now that 1.2 is finalized... who are the go-to people to get some
>>> > long-standing Kafka related issues resolved?
>>> >
>>> > The existing api is not sufficiently safe nor flexible for our
>>> production
>>> > use. I don't think we're alone in this viewpoint, because I've seen
>>> > several different patches and libraries to fix the same things we've
>>> been
>>> > running into.
>>> >
>>> > Regarding flexibility
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-3146
>>> >
>>> > has been outstanding since August, and IMHO an equivalent of this is
>>> > absolutely necessary. We wrote a similar patch ourselves, then found
>>> that
>>> > PR and have been running it in production. We wouldn't be able to get
>>> our
>>> > jobs done without it. It also allows users to solve a whole class of
>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of messages,
>>> etc).
>>> >
>>> > Regarding safety, I understand the motivation behind WriteAheadLog as a
>>> > general solution for streaming unreliable sources, but Kafka already is
>>> a
>>> > reliable source. I think there's a need for an api that treats it as
>>> > such. Even aside from the performance issues of duplicating the
>>> > write-ahead log in kafka into another write-ahead log in hdfs, I need
>>> > exactly-once semantics in the face of failure (I've had failures that
>>> > prevented reloading a spark streaming checkpoint, for instance).
>>> >
>>> > I've got an implementation i've been using
>>> >
>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
>>> > /src/main/scala/org/apache/spark/rdd/kafka
>>> >
>>> > Tresata has something similar at https://github.com/tresata/spark-kafka,
>>>
>>> > and I know there were earlier attempts based on Storm code.
>>> >
>>> > Trying to distribute these kinds of fixes as libraries rather than
>>> patches
>>> > to Spark is problematic, because large portions of the implementation
>>> are
>>> > private[spark].
>>> >
>>> > I'd like to help, but i need to know whose attention to get.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>>
>>

Re: Which committers care about Kafka?

Posted by Cody Koeninger <co...@koeninger.org>.
Thanks for the replies.

Regarding skipping WAL, it's not just about optimization.  If you actually
want exactly-once semantics, you need control of kafka offsets as well,
including the ability to not use zookeeper as the system of record for
offsets.  Kafka already is a reliable system that has strong ordering
guarantees (within a partition) and does not mandate the use of zookeeper
to store offsets.  I think there should be a spark api that acts as a very
simple intermediary between Kafka and the user's choice of downstream store.

Take a look at the links I posted - if there's already been 2 independent
implementations of the idea, chances are it's something people need.

On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:
>
> Hi Cody,
>
> I am an absolute +1 on SPARK-3146. I think we can implement something
> pretty simple and lightweight for that one.
>
> For the Kafka DStream skipping the WAL implementation - this is something
> I discussed with TD a few weeks ago. Though it is a good idea to implement
> this to avoid unnecessary HDFS writes, it is an optimization. For that
> reason, we must be careful in implementation. There are a couple of issues
> that we need to ensure works properly - specifically ordering. To ensure we
> pull messages from different topics and partitions in the same order after
> failure, we’d still have to persist the metadata to HDFS (or some other
> system) - this metadata must contain the order of messages consumed, so we
> know how to re-read the messages. I am planning to explore this once I have
> some time (probably in Jan). In addition, we must also ensure bucketing
> functions work fine as well. I will file a placeholder jira for this one.
>
> I also wrote an API to write data back to Kafka a while back -
> https://github.com/apache/spark/pull/2994 . I am hoping that this will
> get pulled in soon, as this is something I know people want. I am open to
> feedback on that - anything that I can do to make it better.
>
> Thanks,
> Hari
>
>
> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pw...@gmail.com>
> wrote:
>
>> Hey Cody,
>>
>> Thanks for reaching out with this. The lead on streaming is TD - he is
>> traveling this week though so I can respond a bit. To the high level
>> point of whether Kafka is important - it definitely is. Something like
>> 80% of Spark Streaming deployments (anecdotally) ingest data from
>> Kafka. Also, good support for Kafka is something we generally want in
>> Spark and not a library. In some cases IIRC there were user libraries
>> that used unstable Kafka API's and we were somewhat waiting on Kafka
>> to stabilize them to merge things upstream. Otherwise users wouldn't
>> be able to use newer Kakfa versions. This is a high level impression
>> only though, I haven't talked to TD about this recently so it's worth
>> revisiting given the developments in Kafka.
>>
>> Please do bring things up like this on the dev list if there are
>> blockers for your usage - thanks for pinging it.
>>
>> - Patrick
>>
>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>> > Now that 1.2 is finalized... who are the go-to people to get some
>> > long-standing Kafka related issues resolved?
>> >
>> > The existing api is not sufficiently safe nor flexible for our
>> production
>> > use. I don't think we're alone in this viewpoint, because I've seen
>> > several different patches and libraries to fix the same things we've
>> been
>> > running into.
>> >
>> > Regarding flexibility
>> >
>> > https://issues.apache.org/jira/browse/SPARK-3146
>> >
>> > has been outstanding since August, and IMHO an equivalent of this is
>> > absolutely necessary. We wrote a similar patch ourselves, then found
>> that
>> > PR and have been running it in production. We wouldn't be able to get
>> our
>> > jobs done without it. It also allows users to solve a whole class of
>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of messages,
>> etc).
>> >
>> > Regarding safety, I understand the motivation behind WriteAheadLog as a
>> > general solution for streaming unreliable sources, but Kafka already is
>> a
>> > reliable source. I think there's a need for an api that treats it as
>> > such. Even aside from the performance issues of duplicating the
>> > write-ahead log in kafka into another write-ahead log in hdfs, I need
>> > exactly-once semantics in the face of failure (I've had failures that
>> > prevented reloading a spark streaming checkpoint, for instance).
>> >
>> > I've got an implementation i've been using
>> >
>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
>> > /src/main/scala/org/apache/spark/rdd/kafka
>> >
>> > Tresata has something similar at https://github.com/tresata/spark-kafka,
>>
>> > and I know there were earlier attempts based on Storm code.
>> >
>> > Trying to distribute these kinds of fixes as libraries rather than
>> patches
>> > to Spark is problematic, because large portions of the implementation
>> are
>> > private[spark].
>> >
>> > I'd like to help, but i need to know whose attention to get.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>>
>

Re: Which committers care about Kafka?

Posted by Hari Shreedharan <hs...@cloudera.com>.
Hi Cody,




I am an absolute +1 on SPARK-3146. I think we can implement something pretty simple and lightweight for that one.




For the Kafka DStream skipping the WAL implementation - this is something I discussed with TD a few weeks ago. Though it is a good idea to implement this to avoid unnecessary HDFS writes, it is an optimization. For that reason, we must be careful in implementation. There are a couple of issues that we need to ensure works properly - specifically ordering. To ensure we pull messages from different topics and partitions in the same order after failure, we’d still have to persist the metadata to HDFS (or some other system) - this metadata must contain the order of messages consumed, so we know how to re-read the messages. I am planning to explore this once I have some time (probably in Jan). In addition, we must also ensure bucketing functions work fine as well. I will file a placeholder jira for this one. 




I also wrote an API to write data back to Kafka a while back - https://github.com/apache/spark/pull/2994 . I am hoping that this will get pulled in soon, as this is something I know people want. I am open to feedback on that - anything that I can do to make it better.




Thanks, Hari

On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pw...@gmail.com>
wrote:

> Hey Cody,
> Thanks for reaching out with this. The lead on streaming is TD - he is
> traveling this week though so I can respond a bit. To the high level
> point of whether Kafka is important - it definitely is. Something like
> 80% of Spark Streaming deployments (anecdotally) ingest data from
> Kafka. Also, good support for Kafka is something we generally want in
> Spark and not a library. In some cases IIRC there were user libraries
> that used unstable Kafka API's and we were somewhat waiting on Kafka
> to stabilize them to merge things upstream. Otherwise users wouldn't
> be able to use newer Kakfa versions. This is a high level impression
> only though, I haven't talked to TD about this recently so it's worth
> revisiting given the developments in Kafka.
> Please do bring things up like this on the dev list if there are
> blockers for your usage - thanks for pinging it.
> - Patrick
> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org> wrote:
>> Now that 1.2 is finalized...  who are the go-to people to get some
>> long-standing Kafka related issues resolved?
>>
>> The existing api is not sufficiently safe nor flexible for our production
>> use.  I don't think we're alone in this viewpoint, because I've seen
>> several different patches and libraries to fix the same things we've been
>> running into.
>>
>> Regarding flexibility
>>
>> https://issues.apache.org/jira/browse/SPARK-3146
>>
>> has been outstanding since August, and IMHO an equivalent of this is
>> absolutely necessary.  We wrote a similar patch ourselves, then found that
>> PR and have been running it in production.  We wouldn't be able to get our
>> jobs done without it.  It also allows users to solve a whole class of
>> problems for themselves (e.g. SPARK-2388, arbitrary delay of messages, etc).
>>
>> Regarding safety, I understand the motivation behind WriteAheadLog as a
>> general solution for streaming unreliable sources, but Kafka already is a
>> reliable source.  I think there's a need for an api that treats it as
>> such.  Even aside from the performance issues of duplicating the
>> write-ahead log in kafka into another write-ahead log in hdfs, I need
>> exactly-once semantics in the face of failure (I've had failures that
>> prevented reloading a spark streaming checkpoint, for instance).
>>
>> I've got an implementation i've been using
>>
>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
>> /src/main/scala/org/apache/spark/rdd/kafka
>>
>> Tresata has something similar at https://github.com/tresata/spark-kafka,
>> and I know there were earlier attempts based on Storm code.
>>
>> Trying to distribute these kinds of fixes as libraries rather than patches
>> to Spark is problematic, because large portions of the implementation are
>> private[spark].
>>
>>  I'd like to help, but i need to know whose attention to get.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org

Re: Which committers care about Kafka?

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Cody,

Thanks for reaching out with this. The lead on streaming is TD - he is
traveling this week though so I can respond a bit. To the high level
point of whether Kafka is important - it definitely is. Something like
80% of Spark Streaming deployments (anecdotally) ingest data from
Kafka. Also, good support for Kafka is something we generally want in
Spark and not a library. In some cases IIRC there were user libraries
that used unstable Kafka API's and we were somewhat waiting on Kafka
to stabilize them to merge things upstream. Otherwise users wouldn't
be able to use newer Kakfa versions. This is a high level impression
only though, I haven't talked to TD about this recently so it's worth
revisiting given the developments in Kafka.

Please do bring things up like this on the dev list if there are
blockers for your usage - thanks for pinging it.

- Patrick

On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <co...@koeninger.org> wrote:
> Now that 1.2 is finalized...  who are the go-to people to get some
> long-standing Kafka related issues resolved?
>
> The existing api is not sufficiently safe nor flexible for our production
> use.  I don't think we're alone in this viewpoint, because I've seen
> several different patches and libraries to fix the same things we've been
> running into.
>
> Regarding flexibility
>
> https://issues.apache.org/jira/browse/SPARK-3146
>
> has been outstanding since August, and IMHO an equivalent of this is
> absolutely necessary.  We wrote a similar patch ourselves, then found that
> PR and have been running it in production.  We wouldn't be able to get our
> jobs done without it.  It also allows users to solve a whole class of
> problems for themselves (e.g. SPARK-2388, arbitrary delay of messages, etc).
>
> Regarding safety, I understand the motivation behind WriteAheadLog as a
> general solution for streaming unreliable sources, but Kafka already is a
> reliable source.  I think there's a need for an api that treats it as
> such.  Even aside from the performance issues of duplicating the
> write-ahead log in kafka into another write-ahead log in hdfs, I need
> exactly-once semantics in the face of failure (I've had failures that
> prevented reloading a spark streaming checkpoint, for instance).
>
> I've got an implementation i've been using
>
> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
> /src/main/scala/org/apache/spark/rdd/kafka
>
> Tresata has something similar at https://github.com/tresata/spark-kafka,
> and I know there were earlier attempts based on Storm code.
>
> Trying to distribute these kinds of fixes as libraries rather than patches
> to Spark is problematic, because large portions of the implementation are
> private[spark].
>
>  I'd like to help, but i need to know whose attention to get.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org