You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Dmitry Naumenko <dm...@gmail.com> on 2017/09/11 12:15:10 UTC

Easy way to get offset metatada with Spark Streaming API

Hi all,

It started as a discussion in
https://stackoverflow.com/questions/46153105/how-to-get-kafka-offsets-with-spark-structured-streaming-api
.

So the problem that there is no support in Public API to obtain the Kafka
(or Kineses) offsets. For example, if you want to save offsets in external
storage in Custom Sink, you should :
1) preserve topic, partition and offset across all transform operations of
Dataset (based on hard-coded Kafka schema)
2) make a manual group by partition/offset with aggregate max offset

Structured Streaming doc says "Every streaming source is assumed to have
offsets", so why it's not a part of Public API? What do you think about
supporting it?

Dmitry

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Dmitry Naumenko <dm...@gmail.com>.
Nice, thanks again Michael for helping out.

Dmitry

2017-09-14 21:37 GMT+03:00 Michael Armbrust <mi...@databricks.com>:

> Yep, that is correct.  You can also use the query ID which is a GUID that
> is stored in the checkpoint and preserved across restarts if you want to
> distinguish the batches from different streams.
>
> sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
>
> This was added recently
> <https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27>
> though.
>
> On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm...@gmail.com>
> wrote:
>
>> Ok. So since I can get repeated batch ids, I guess I can just store the
>> last committed batch id in my storage (in the same transaction with the
>> data) and initialize the custom sink with right batch id when application
>> re-starts. After this just ignore batch if current batchId <=
>> latestBatchId.
>>
>> Dmitry
>>
>>
>> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>
>>> I think the right way to look at this is the batchId is just a proxy for
>>> offsets that is agnostic to what type of source you are reading from (or
>>> how many sources their are).  We might call into a custom sink with the
>>> same batchId more than once, but it will always contain the same data
>>> (there is no race condition, since this is stored in a write-ahead log).
>>> As long as you check/commit the batch id in the same transaction as the
>>> data you will get exactly once.
>>>
>>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm...@gmail.com>
>>> wrote:
>>>
>>>> Thanks, I see.
>>>>
>>>> However, I guess reading from checkpoint directory might be less
>>>> efficient comparing just preserving offsets in Dataset.
>>>>
>>>> I have one more question about operation idempotence (hope it help
>>>> others to have a clear picture).
>>>>
>>>> If I read offsets on re-start from RDBMS and manually specify starting
>>>> offsets on Kafka Source, is it still possible that in case of any failure I
>>>> got a situation where the duplicate batch id will go to a Custom Sink?
>>>>
>>>> Previously on DStream, you will just read offsets from storage on start
>>>> and just write them into DB in one transaction with data and it's was
>>>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>>>> does the same strategy will work with Structured Streaming?
>>>>
>>>> I guess, that in case of Structured Streaming, Spark will commit batch
>>>> offset to a checkpoint directory and there can be a race condition where
>>>> you can commit your data with offsets into DB, but Spark will fail to
>>>> commit the batch id, and some kind of automatic retry happen. If this is
>>>> true, is it possible to disable this automatic re-try, so I can still use
>>>> unified API for batch/streaming with my own re-try logic (which is
>>>> basically, just ignore intermediate data, re-read from Kafka and re-try
>>>> processing and load)?
>>>>
>>>> Dmitry
>>>>
>>>>
>>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>>
>>>>> In the checkpoint directory there is a file /offsets/$batchId that
>>>>> holds the offsets serialized as JSON.  I would not consider this a public
>>>>> stable API though.
>>>>>
>>>>> Really the only important thing to get exactly once is that you must
>>>>> ensure whatever operation you are doing downstream is idempotent with
>>>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>>>> could have a table that records the batch ID and update that in the same
>>>>> transaction as you append the results of the batch.  Before trying to
>>>>> append you should check that batch ID and make sure you have not already
>>>>> committed.
>>>>>
>>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <
>>>>> dm.naumenko@gmail.com> wrote:
>>>>>
>>>>>> Thanks for response, Michael
>>>>>>
>>>>>> >  You should still be able to get exactly once processing by using
>>>>>> the batchId that is passed to the Sink.
>>>>>>
>>>>>> Could you explain this in more detail, please? Is there some kind of
>>>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>>>
>>>>>> Dmitry
>>>>>>
>>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>>>>
>>>>>>> I think that we are going to have to change the Sink API as part of
>>>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>>>>> of the year.
>>>>>>>
>>>>>>> There are some misconceptions in that stack overflow answer that I
>>>>>>> can correct.  Until we improve the Source API, You should still be able to
>>>>>>> get exactly once processing by using the batchId that is passed to
>>>>>>> the Sink. We guarantee that the offsets present at any given batch
>>>>>>> ID will be the same across retries by recording this information in the
>>>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Cody
>>>>>>>>
>>>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>>>
>>>>>>>> Dmitry
>>>>>>>>
>>>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>>>>>>
>>>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>>>
>>>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>>>>> > Hi all,
>>>>>>>>> >
>>>>>>>>> > It started as a discussion in
>>>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>>>> >
>>>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>>>> the Kafka
>>>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets
>>>>>>>>> in external
>>>>>>>>> > storage in Custom Sink, you should :
>>>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>>>> operations of
>>>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>>>> offset
>>>>>>>>> >
>>>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>>>> to have
>>>>>>>>> > offsets", so why it's not a part of Public API? What do you
>>>>>>>>> think about
>>>>>>>>> > supporting it?
>>>>>>>>> >
>>>>>>>>> > Dmitry
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Michael Armbrust <mi...@databricks.com>.
Yep, that is correct.  You can also use the query ID which is a GUID that
is stored in the checkpoint and preserved across restarts if you want to
distinguish the batches from different streams.

sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)

This was added recently
<https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27>
though.

On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm...@gmail.com>
wrote:

> Ok. So since I can get repeated batch ids, I guess I can just store the
> last committed batch id in my storage (in the same transaction with the
> data) and initialize the custom sink with right batch id when application
> re-starts. After this just ignore batch if current batchId <=
> latestBatchId.
>
> Dmitry
>
>
> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>
>> I think the right way to look at this is the batchId is just a proxy for
>> offsets that is agnostic to what type of source you are reading from (or
>> how many sources their are).  We might call into a custom sink with the
>> same batchId more than once, but it will always contain the same data
>> (there is no race condition, since this is stored in a write-ahead log).
>> As long as you check/commit the batch id in the same transaction as the
>> data you will get exactly once.
>>
>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm...@gmail.com>
>> wrote:
>>
>>> Thanks, I see.
>>>
>>> However, I guess reading from checkpoint directory might be less
>>> efficient comparing just preserving offsets in Dataset.
>>>
>>> I have one more question about operation idempotence (hope it help
>>> others to have a clear picture).
>>>
>>> If I read offsets on re-start from RDBMS and manually specify starting
>>> offsets on Kafka Source, is it still possible that in case of any failure I
>>> got a situation where the duplicate batch id will go to a Custom Sink?
>>>
>>> Previously on DStream, you will just read offsets from storage on start
>>> and just write them into DB in one transaction with data and it's was
>>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>>> does the same strategy will work with Structured Streaming?
>>>
>>> I guess, that in case of Structured Streaming, Spark will commit batch
>>> offset to a checkpoint directory and there can be a race condition where
>>> you can commit your data with offsets into DB, but Spark will fail to
>>> commit the batch id, and some kind of automatic retry happen. If this is
>>> true, is it possible to disable this automatic re-try, so I can still use
>>> unified API for batch/streaming with my own re-try logic (which is
>>> basically, just ignore intermediate data, re-read from Kafka and re-try
>>> processing and load)?
>>>
>>> Dmitry
>>>
>>>
>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>
>>>> In the checkpoint directory there is a file /offsets/$batchId that
>>>> holds the offsets serialized as JSON.  I would not consider this a public
>>>> stable API though.
>>>>
>>>> Really the only important thing to get exactly once is that you must
>>>> ensure whatever operation you are doing downstream is idempotent with
>>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>>> could have a table that records the batch ID and update that in the same
>>>> transaction as you append the results of the batch.  Before trying to
>>>> append you should check that batch ID and make sure you have not already
>>>> committed.
>>>>
>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <
>>>> dm.naumenko@gmail.com> wrote:
>>>>
>>>>> Thanks for response, Michael
>>>>>
>>>>> >  You should still be able to get exactly once processing by using
>>>>> the batchId that is passed to the Sink.
>>>>>
>>>>> Could you explain this in more detail, please? Is there some kind of
>>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>>>
>>>>>> I think that we are going to have to change the Sink API as part of
>>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>>>> of the year.
>>>>>>
>>>>>> There are some misconceptions in that stack overflow answer that I
>>>>>> can correct.  Until we improve the Source API, You should still be able to
>>>>>> get exactly once processing by using the batchId that is passed to
>>>>>> the Sink. We guarantee that the offsets present at any given batch
>>>>>> ID will be the same across retries by recording this information in the
>>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Cody
>>>>>>>
>>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>>
>>>>>>> Dmitry
>>>>>>>
>>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>>>>>
>>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>>
>>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>>>> > Hi all,
>>>>>>>> >
>>>>>>>> > It started as a discussion in
>>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>>> >
>>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>>> the Kafka
>>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>>>> external
>>>>>>>> > storage in Custom Sink, you should :
>>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>>> operations of
>>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>>> offset
>>>>>>>> >
>>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>>> to have
>>>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>>>> about
>>>>>>>> > supporting it?
>>>>>>>> >
>>>>>>>> > Dmitry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Dmitry Naumenko <dm...@gmail.com>.
Ok. So since I can get repeated batch ids, I guess I can just store the
last committed batch id in my storage (in the same transaction with the
data) and initialize the custom sink with right batch id when application
re-starts. After this just ignore batch if current batchId <= latestBatchId.

Dmitry

2017-09-13 22:12 GMT+03:00 Michael Armbrust <mi...@databricks.com>:

> I think the right way to look at this is the batchId is just a proxy for
> offsets that is agnostic to what type of source you are reading from (or
> how many sources their are).  We might call into a custom sink with the
> same batchId more than once, but it will always contain the same data
> (there is no race condition, since this is stored in a write-ahead log).
> As long as you check/commit the batch id in the same transaction as the
> data you will get exactly once.
>
> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm...@gmail.com>
> wrote:
>
>> Thanks, I see.
>>
>> However, I guess reading from checkpoint directory might be less
>> efficient comparing just preserving offsets in Dataset.
>>
>> I have one more question about operation idempotence (hope it help others
>> to have a clear picture).
>>
>> If I read offsets on re-start from RDBMS and manually specify starting
>> offsets on Kafka Source, is it still possible that in case of any failure I
>> got a situation where the duplicate batch id will go to a Custom Sink?
>>
>> Previously on DStream, you will just read offsets from storage on start
>> and just write them into DB in one transaction with data and it's was
>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>> does the same strategy will work with Structured Streaming?
>>
>> I guess, that in case of Structured Streaming, Spark will commit batch
>> offset to a checkpoint directory and there can be a race condition where
>> you can commit your data with offsets into DB, but Spark will fail to
>> commit the batch id, and some kind of automatic retry happen. If this is
>> true, is it possible to disable this automatic re-try, so I can still use
>> unified API for batch/streaming with my own re-try logic (which is
>> basically, just ignore intermediate data, re-read from Kafka and re-try
>> processing and load)?
>>
>> Dmitry
>>
>>
>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>
>>> In the checkpoint directory there is a file /offsets/$batchId that holds
>>> the offsets serialized as JSON.  I would not consider this a public stable
>>> API though.
>>>
>>> Really the only important thing to get exactly once is that you must
>>> ensure whatever operation you are doing downstream is idempotent with
>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>> could have a table that records the batch ID and update that in the same
>>> transaction as you append the results of the batch.  Before trying to
>>> append you should check that batch ID and make sure you have not already
>>> committed.
>>>
>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naumenko@gmail.com
>>> > wrote:
>>>
>>>> Thanks for response, Michael
>>>>
>>>> >  You should still be able to get exactly once processing by using
>>>> the batchId that is passed to the Sink.
>>>>
>>>> Could you explain this in more detail, please? Is there some kind of
>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>
>>>> Dmitry
>>>>
>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>>
>>>>> I think that we are going to have to change the Sink API as part of
>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>>> of the year.
>>>>>
>>>>> There are some misconceptions in that stack overflow answer that I can
>>>>> correct.  Until we improve the Source API, You should still be able to get
>>>>> exactly once processing by using the batchId that is passed to the
>>>>> Sink. We guarantee that the offsets present at any given batch ID
>>>>> will be the same across retries by recording this information in the
>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>
>>>>>
>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>> dm.naumenko@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Cody
>>>>>>
>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>
>>>>>> Dmitry
>>>>>>
>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>>>>
>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>
>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>>> > Hi all,
>>>>>>> >
>>>>>>> > It started as a discussion in
>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>> >
>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>> the Kafka
>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>>> external
>>>>>>> > storage in Custom Sink, you should :
>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>> operations of
>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>> offset
>>>>>>> >
>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>> to have
>>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>>> about
>>>>>>> > supporting it?
>>>>>>> >
>>>>>>> > Dmitry
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Michael Armbrust <mi...@databricks.com>.
I think the right way to look at this is the batchId is just a proxy for
offsets that is agnostic to what type of source you are reading from (or
how many sources their are).  We might call into a custom sink with the
same batchId more than once, but it will always contain the same data
(there is no race condition, since this is stored in a write-ahead log).
As long as you check/commit the batch id in the same transaction as the
data you will get exactly once.

On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm...@gmail.com>
wrote:

> Thanks, I see.
>
> However, I guess reading from checkpoint directory might be less efficient
> comparing just preserving offsets in Dataset.
>
> I have one more question about operation idempotence (hope it help others
> to have a clear picture).
>
> If I read offsets on re-start from RDBMS and manually specify starting
> offsets on Kafka Source, is it still possible that in case of any failure I
> got a situation where the duplicate batch id will go to a Custom Sink?
>
> Previously on DStream, you will just read offsets from storage on start
> and just write them into DB in one transaction with data and it's was
> enough for "exactly-once". Please, correct me if I made a mistake here. So
> does the same strategy will work with Structured Streaming?
>
> I guess, that in case of Structured Streaming, Spark will commit batch
> offset to a checkpoint directory and there can be a race condition where
> you can commit your data with offsets into DB, but Spark will fail to
> commit the batch id, and some kind of automatic retry happen. If this is
> true, is it possible to disable this automatic re-try, so I can still use
> unified API for batch/streaming with my own re-try logic (which is
> basically, just ignore intermediate data, re-read from Kafka and re-try
> processing and load)?
>
> Dmitry
>
>
> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>
>> In the checkpoint directory there is a file /offsets/$batchId that holds
>> the offsets serialized as JSON.  I would not consider this a public stable
>> API though.
>>
>> Really the only important thing to get exactly once is that you must
>> ensure whatever operation you are doing downstream is idempotent with
>> respect to the batchId.  For example, if you are writing to an RDBMS you
>> could have a table that records the batch ID and update that in the same
>> transaction as you append the results of the batch.  Before trying to
>> append you should check that batch ID and make sure you have not already
>> committed.
>>
>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm...@gmail.com>
>> wrote:
>>
>>> Thanks for response, Michael
>>>
>>> >  You should still be able to get exactly once processing by using the batchId
>>> that is passed to the Sink.
>>>
>>> Could you explain this in more detail, please? Is there some kind of
>>> offset manager API that works as get-offset by batch id lookup table?
>>>
>>> Dmitry
>>>
>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>>
>>>> I think that we are going to have to change the Sink API as part of
>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>> which is why I linked these tickets together.  I'm still targeting an
>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>> of the year.
>>>>
>>>> There are some misconceptions in that stack overflow answer that I can
>>>> correct.  Until we improve the Source API, You should still be able to get
>>>> exactly once processing by using the batchId that is passed to the Sink.
>>>> We guarantee that the offsets present at any given batch ID will be the
>>>> same across retries by recording this information in the checkpoint's WAL.
>>>> The checkpoint does not use java serialization (like DStreams does) and can
>>>> be used even after upgrading Spark.
>>>>
>>>>
>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>> dm.naumenko@gmail.com> wrote:
>>>>
>>>>> Thanks, Cody
>>>>>
>>>>> Unfortunately, it seems to be there is no active development right
>>>>> now. Maybe I can step in and help with it somehow?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>>>
>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>> dm.naumenko@gmail.com> wrote:
>>>>>> > Hi all,
>>>>>> >
>>>>>> > It started as a discussion in
>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>> >
>>>>>> > So the problem that there is no support in Public API to obtain the
>>>>>> Kafka
>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>> external
>>>>>> > storage in Custom Sink, you should :
>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>> operations of
>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>>>> >
>>>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>>>> have
>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>> about
>>>>>> > supporting it?
>>>>>> >
>>>>>> > Dmitry
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Dmitry Naumenko <dm...@gmail.com>.
Thanks, I see.

However, I guess reading from checkpoint directory might be less efficient
comparing just preserving offsets in Dataset.

I have one more question about operation idempotence (hope it help others
to have a clear picture).

If I read offsets on re-start from RDBMS and manually specify starting
offsets on Kafka Source, is it still possible that in case of any failure I
got a situation where the duplicate batch id will go to a Custom Sink?

Previously on DStream, you will just read offsets from storage on start and
just write them into DB in one transaction with data and it's was enough
for "exactly-once". Please, correct me if I made a mistake here. So does
the same strategy will work with Structured Streaming?

I guess, that in case of Structured Streaming, Spark will commit batch
offset to a checkpoint directory and there can be a race condition where
you can commit your data with offsets into DB, but Spark will fail to
commit the batch id, and some kind of automatic retry happen. If this is
true, is it possible to disable this automatic re-try, so I can still use
unified API for batch/streaming with my own re-try logic (which is
basically, just ignore intermediate data, re-read from Kafka and re-try
processing and load)?

Dmitry


2017-09-12 22:43 GMT+03:00 Michael Armbrust <mi...@databricks.com>:

> In the checkpoint directory there is a file /offsets/$batchId that holds
> the offsets serialized as JSON.  I would not consider this a public stable
> API though.
>
> Really the only important thing to get exactly once is that you must
> ensure whatever operation you are doing downstream is idempotent with
> respect to the batchId.  For example, if you are writing to an RDBMS you
> could have a table that records the batch ID and update that in the same
> transaction as you append the results of the batch.  Before trying to
> append you should check that batch ID and make sure you have not already
> committed.
>
> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm...@gmail.com>
> wrote:
>
>> Thanks for response, Michael
>>
>> >  You should still be able to get exactly once processing by using the batchId
>> that is passed to the Sink.
>>
>> Could you explain this in more detail, please? Is there some kind of
>> offset manager API that works as get-offset by batch id lookup table?
>>
>> Dmitry
>>
>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>>
>>> I think that we are going to have to change the Sink API as part of
>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>> which is why I linked these tickets together.  I'm still targeting an
>>> initial version for Spark 2.3 which should happen sometime towards the end
>>> of the year.
>>>
>>> There are some misconceptions in that stack overflow answer that I can
>>> correct.  Until we improve the Source API, You should still be able to get
>>> exactly once processing by using the batchId that is passed to the Sink.
>>> We guarantee that the offsets present at any given batch ID will be the
>>> same across retries by recording this information in the checkpoint's WAL.
>>> The checkpoint does not use java serialization (like DStreams does) and can
>>> be used even after upgrading Spark.
>>>
>>>
>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm.naumenko@gmail.com
>>> > wrote:
>>>
>>>> Thanks, Cody
>>>>
>>>> Unfortunately, it seems to be there is no active development right now.
>>>> Maybe I can step in and help with it somehow?
>>>>
>>>> Dmitry
>>>>
>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>>
>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>
>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>> dm.naumenko@gmail.com> wrote:
>>>>> > Hi all,
>>>>> >
>>>>> > It started as a discussion in
>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>> >
>>>>> > So the problem that there is no support in Public API to obtain the
>>>>> Kafka
>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>> external
>>>>> > storage in Custom Sink, you should :
>>>>> > 1) preserve topic, partition and offset across all transform
>>>>> operations of
>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>>> >
>>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>>> have
>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>> about
>>>>> > supporting it?
>>>>> >
>>>>> > Dmitry
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Michael Armbrust <mi...@databricks.com>.
In the checkpoint directory there is a file /offsets/$batchId that holds
the offsets serialized as JSON.  I would not consider this a public stable
API though.

Really the only important thing to get exactly once is that you must ensure
whatever operation you are doing downstream is idempotent with respect to
the batchId.  For example, if you are writing to an RDBMS you could have a
table that records the batch ID and update that in the same transaction as
you append the results of the batch.  Before trying to append you should
check that batch ID and make sure you have not already committed.

On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm...@gmail.com>
wrote:

> Thanks for response, Michael
>
> >  You should still be able to get exactly once processing by using the batchId
> that is passed to the Sink.
>
> Could you explain this in more detail, please? Is there some kind of
> offset manager API that works as get-offset by batch id lookup table?
>
> Dmitry
>
> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:
>
>> I think that we are going to have to change the Sink API as part of
>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>> which is why I linked these tickets together.  I'm still targeting an
>> initial version for Spark 2.3 which should happen sometime towards the end
>> of the year.
>>
>> There are some misconceptions in that stack overflow answer that I can
>> correct.  Until we improve the Source API, You should still be able to get
>> exactly once processing by using the batchId that is passed to the Sink.
>> We guarantee that the offsets present at any given batch ID will be the
>> same across retries by recording this information in the checkpoint's WAL.
>> The checkpoint does not use java serialization (like DStreams does) and can
>> be used even after upgrading Spark.
>>
>>
>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm...@gmail.com>
>> wrote:
>>
>>> Thanks, Cody
>>>
>>> Unfortunately, it seems to be there is no active development right now.
>>> Maybe I can step in and help with it somehow?
>>>
>>> Dmitry
>>>
>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>>
>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>
>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm...@gmail.com>
>>>> wrote:
>>>> > Hi all,
>>>> >
>>>> > It started as a discussion in
>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>> a-offsets-with-spark-structured-streaming-api.
>>>> >
>>>> > So the problem that there is no support in Public API to obtain the
>>>> Kafka
>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>> external
>>>> > storage in Custom Sink, you should :
>>>> > 1) preserve topic, partition and offset across all transform
>>>> operations of
>>>> > Dataset (based on hard-coded Kafka schema)
>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>> >
>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>> have
>>>> > offsets", so why it's not a part of Public API? What do you think
>>>> about
>>>> > supporting it?
>>>> >
>>>> > Dmitry
>>>>
>>>
>>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Dmitry Naumenko <dm...@gmail.com>.
Thanks for response, Michael

>  You should still be able to get exactly once processing by using the batchId
that is passed to the Sink.

Could you explain this in more detail, please? Is there some kind of offset
manager API that works as get-offset by batch id lookup table?

Dmitry

2017-09-12 20:29 GMT+03:00 Michael Armbrust <mi...@databricks.com>:

> I think that we are going to have to change the Sink API as part of
> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
> which is why I linked these tickets together.  I'm still targeting an
> initial version for Spark 2.3 which should happen sometime towards the end
> of the year.
>
> There are some misconceptions in that stack overflow answer that I can
> correct.  Until we improve the Source API, You should still be able to get
> exactly once processing by using the batchId that is passed to the Sink.
> We guarantee that the offsets present at any given batch ID will be the
> same across retries by recording this information in the checkpoint's WAL.
> The checkpoint does not use java serialization (like DStreams does) and can
> be used even after upgrading Spark.
>
>
> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm...@gmail.com>
> wrote:
>
>> Thanks, Cody
>>
>> Unfortunately, it seems to be there is no active development right now.
>> Maybe I can step in and help with it somehow?
>>
>> Dmitry
>>
>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>>
>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>
>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm...@gmail.com>
>>> wrote:
>>> > Hi all,
>>> >
>>> > It started as a discussion in
>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>> a-offsets-with-spark-structured-streaming-api.
>>> >
>>> > So the problem that there is no support in Public API to obtain the
>>> Kafka
>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>> external
>>> > storage in Custom Sink, you should :
>>> > 1) preserve topic, partition and offset across all transform
>>> operations of
>>> > Dataset (based on hard-coded Kafka schema)
>>> > 2) make a manual group by partition/offset with aggregate max offset
>>> >
>>> > Structured Streaming doc says "Every streaming source is assumed to
>>> have
>>> > offsets", so why it's not a part of Public API? What do you think about
>>> > supporting it?
>>> >
>>> > Dmitry
>>>
>>
>>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Michael Armbrust <mi...@databricks.com>.
I think that we are going to have to change the Sink API as part of
SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>, which
is why I linked these tickets together.  I'm still targeting an initial
version for Spark 2.3 which should happen sometime towards the end of the
year.

There are some misconceptions in that stack overflow answer that I can
correct.  Until we improve the Source API, You should still be able to get
exactly once processing by using the batchId that is passed to the Sink. We
guarantee that the offsets present at any given batch ID will be the same
across retries by recording this information in the checkpoint's WAL. The
checkpoint does not use java serialization (like DStreams does) and can be
used even after upgrading Spark.


On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm...@gmail.com>
wrote:

> Thanks, Cody
>
> Unfortunately, it seems to be there is no active development right now.
> Maybe I can step in and help with it somehow?
>
> Dmitry
>
> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:
>
>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>
>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > It started as a discussion in
>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>> a-offsets-with-spark-structured-streaming-api.
>> >
>> > So the problem that there is no support in Public API to obtain the
>> Kafka
>> > (or Kineses) offsets. For example, if you want to save offsets in
>> external
>> > storage in Custom Sink, you should :
>> > 1) preserve topic, partition and offset across all transform operations
>> of
>> > Dataset (based on hard-coded Kafka schema)
>> > 2) make a manual group by partition/offset with aggregate max offset
>> >
>> > Structured Streaming doc says "Every streaming source is assumed to have
>> > offsets", so why it's not a part of Public API? What do you think about
>> > supporting it?
>> >
>> > Dmitry
>>
>
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Dmitry Naumenko <dm...@gmail.com>.
Thanks, Cody

Unfortunately, it seems to be there is no active development right now.
Maybe I can step in and help with it somehow?

Dmitry

2017-09-11 21:01 GMT+03:00 Cody Koeninger <co...@koeninger.org>:

> https://issues-test.apache.org/jira/browse/SPARK-18258
>
> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm...@gmail.com>
> wrote:
> > Hi all,
> >
> > It started as a discussion in
> > https://stackoverflow.com/questions/46153105/how-to-get-
> kafka-offsets-with-spark-structured-streaming-api.
> >
> > So the problem that there is no support in Public API to obtain the Kafka
> > (or Kineses) offsets. For example, if you want to save offsets in
> external
> > storage in Custom Sink, you should :
> > 1) preserve topic, partition and offset across all transform operations
> of
> > Dataset (based on hard-coded Kafka schema)
> > 2) make a manual group by partition/offset with aggregate max offset
> >
> > Structured Streaming doc says "Every streaming source is assumed to have
> > offsets", so why it's not a part of Public API? What do you think about
> > supporting it?
> >
> > Dmitry
>

Re: Easy way to get offset metatada with Spark Streaming API

Posted by Cody Koeninger <co...@koeninger.org>.
https://issues-test.apache.org/jira/browse/SPARK-18258

On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm...@gmail.com> wrote:
> Hi all,
>
> It started as a discussion in
> https://stackoverflow.com/questions/46153105/how-to-get-kafka-offsets-with-spark-structured-streaming-api.
>
> So the problem that there is no support in Public API to obtain the Kafka
> (or Kineses) offsets. For example, if you want to save offsets in external
> storage in Custom Sink, you should :
> 1) preserve topic, partition and offset across all transform operations of
> Dataset (based on hard-coded Kafka schema)
> 2) make a manual group by partition/offset with aggregate max offset
>
> Structured Streaming doc says "Every streaming source is assumed to have
> offsets", so why it's not a part of Public API? What do you think about
> supporting it?
>
> Dmitry

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org