You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Carlos Garcia <jc...@gmail.com> on 2018/10/02 05:25:09 UTC

Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

Hi folks we are running a pipeline which as the subject says the we are
having issues with data lost.

Using KafkaIO (2.0.4 due to the version of our brokers) with
commitOnFinalize, we would like to understand how this finalize work
together with a FileIO.

I studied the KafkaIO and saw that the records are committed to kafka
inside the consumerPollLoop method only when a checkpoint is produced, but
when is this checkpoint produced?, how does it cope with windowed data and
a FileIO to produces files?

When running with spark our batchInterval is 30secs, and the pipeline have
a fixed-window of 1hr for FileIO to write to HDFS and we are constantly
restarting the pipeline (1 or 3 times a day, or yarn reach it maximum
restart attempt and then it kill it completely due to networks interruption
), however we have detected we have missing data on HDFS.

Initially we were running without specifying a checkpoint directory
(SparkRunner) , and we found that on each deployment a random directory was
generated under /tmp, recently we started to uses a fixed directory for
checkpoint (via - - checkpointDir on the spark runner), but still we have
doubts that this will completely solve our data lost problems when
restarting the pipeline multiple times a day (or is it our assumption
incorrect? ).

Please advice if this usecase (data ingestion to hdfs) is something beam
could achieve without lossing data from KafkaIO.

Thanks
JC

Re: Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

Posted by Lukasz Cwik <lc...@google.com>.
There has been some discussion in the past about adding a "drain" feature
to Apache Beam which would allow this intermediate data to be output so it
isn't lost. The caveat is that you'll be outputting partial results.

The design doc was shared here:
https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit

On Tue, Oct 2, 2018 at 8:58 AM Raghu Angadi <ra...@google.com> wrote:

> > I am curious about what you mentioned (30min of 1hr window would be
> lost), just a noob question, why?
>
> Say you have 1 hour windowing in your pipeline. The aggregation is emitted
> at the end of the window. 30 minutes into the window, there might many
> incoming messages processed already. Where should the information about
> this partially processed window be stored? Managing this kind of state is
> an important part of runner. It is checkpointed on some persistent storage.
> If you restart the pipeline at that time, new job may not have access to
> stage from previous job, so you would lose 30 minutes worth of messages.
>
> The reliability has to be provided by the runner. I don't know much about
> details on SparkRunner, but you can certainly try another runner like Flink
> or Dataflow.
>
> Raghu.
> On Tue, Oct 2, 2018 at 8:09 AM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Thanks for you inputs on this matter, by data loss i meant data that is
>> on Kafka but was not written in HDFS due to restarting the pipeline
>> (SparkRunner) or because it failed due to connectivity and was kill by Yarn
>> and when we restarted the pipeline those records were skipped.
>>
>> I am curious about what you mentioned (30min of 1hr window would be
>> lost), just a noob question, why?
>>
>> > read from any source and write on any supported sink.
>>
>> I have no doubt about it.
>>
>> With a multi stage pipeline (where we sort, manipulate, group the data)
>> my purpose is to reliable sink data to HDFS, regardless of any interruption
>> on the pipeline, like other ingestion library do in "batch" (Camus /
>> gobbling from LinkedIn)
>>
>> During this period we are using HDFS as sink with Spark with a window to
>> avoid hitting HDFS badly, do you recommend me to use Flink instead (at
>> least for this requirement)?
>>
>> Thanks in advanced
>>
>> Raghu Angadi <ra...@google.com> schrieb am Di., 2. Okt. 2018, 08:25:
>>
>>> This is mostly a question about SparkRunner and to certain extent
>>> FileIO. You might want to elaborate a bit more what you mean by data loss.
>>> In most cases, restarting a pipeline from scratch loses checkpointed state
>>> from previous job (e.g. first 30 minutes of a 1 hour window would be lost),
>>> unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
>>> starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).
>>>
>>> Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
>>> corresponding messages are processed/checkpointed. In the case of Spark and
>>> Dataflow, it would be after the messages pass through the first stage of
>>> the pipeline.
>>>
>>> > Please advice if this usecase (data ingestion to hdfs) is something
>>> beam could achieve without lossing data from KafkaIO.
>>> Yes, reading from any supported source and writing to any supported sink
>>> is supported. Otherwise, it would be a bug.
>>>
>>> On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <jc...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks we are running a pipeline which as the subject says the we are
>>>> having issues with data lost.
>>>>
>>>> Using KafkaIO (2.0.4 due to the version of our brokers) with
>>>> commitOnFinalize, we would like to understand how this finalize work
>>>> together with a FileIO.
>>>>
>>>> I studied the KafkaIO and saw that the records are committed to kafka
>>>> inside the consumerPollLoop method only when a checkpoint is produced, but
>>>> when is this checkpoint produced?, how does it cope with windowed data and
>>>> a FileIO to produces files?
>>>>
>>>> When running with spark our batchInterval is 30secs, and the pipeline
>>>> have a fixed-window of 1hr for FileIO to write to HDFS and we are
>>>> constantly restarting the pipeline (1 or 3 times a day, or yarn reach it
>>>> maximum restart attempt and then it kill it completely due to networks
>>>> interruption ), however we have detected we have missing data on HDFS.
>>>>
>>>> Initially we were running without specifying a checkpoint directory
>>>> (SparkRunner) , and we found that on each deployment a random directory was
>>>> generated under /tmp, recently we started to uses a fixed directory for
>>>> checkpoint (via - - checkpointDir on the spark runner), but still we have
>>>> doubts that this will completely solve our data lost problems when
>>>> restarting the pipeline multiple times a day (or is it our assumption
>>>> incorrect? ).
>>>>
>>>> Please advice if this usecase (data ingestion to hdfs) is something
>>>> beam could achieve without lossing data from KafkaIO.
>>>>
>>>> Thanks
>>>> JC
>>>>
>>>>
>>>>
>>>>

Re: Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

Posted by Raghu Angadi <ra...@google.com>.
> I am curious about what you mentioned (30min of 1hr window would be
lost), just a noob question, why?

Say you have 1 hour windowing in your pipeline. The aggregation is emitted
at the end of the window. 30 minutes into the window, there might many
incoming messages processed already. Where should the information about
this partially processed window be stored? Managing this kind of state is
an important part of runner. It is checkpointed on some persistent storage.
If you restart the pipeline at that time, new job may not have access to
stage from previous job, so you would lose 30 minutes worth of messages.

The reliability has to be provided by the runner. I don't know much about
details on SparkRunner, but you can certainly try another runner like Flink
or Dataflow.

Raghu.
On Tue, Oct 2, 2018 at 8:09 AM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Thanks for you inputs on this matter, by data loss i meant data that is on
> Kafka but was not written in HDFS due to restarting the pipeline
> (SparkRunner) or because it failed due to connectivity and was kill by Yarn
> and when we restarted the pipeline those records were skipped.
>
> I am curious about what you mentioned (30min of 1hr window would be lost),
> just a noob question, why?
>
> > read from any source and write on any supported sink.
>
> I have no doubt about it.
>
> With a multi stage pipeline (where we sort, manipulate, group the data) my
> purpose is to reliable sink data to HDFS, regardless of any interruption on
> the pipeline, like other ingestion library do in "batch" (Camus / gobbling
> from LinkedIn)
>
> During this period we are using HDFS as sink with Spark with a window to
> avoid hitting HDFS badly, do you recommend me to use Flink instead (at
> least for this requirement)?
>
> Thanks in advanced
>
> Raghu Angadi <ra...@google.com> schrieb am Di., 2. Okt. 2018, 08:25:
>
>> This is mostly a question about SparkRunner and to certain extent FileIO.
>> You might want to elaborate a bit more what you mean by data loss. In most
>> cases, restarting a pipeline from scratch loses checkpointed state from
>> previous job (e.g. first 30 minutes of a 1 hour window would be lost),
>> unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
>> starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).
>>
>> Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
>> corresponding messages are processed/checkpointed. In the case of Spark and
>> Dataflow, it would be after the messages pass through the first stage of
>> the pipeline.
>>
>> > Please advice if this usecase (data ingestion to hdfs) is something
>> beam could achieve without lossing data from KafkaIO.
>> Yes, reading from any supported source and writing to any supported sink
>> is supported. Otherwise, it would be a bug.
>>
>> On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> Hi folks we are running a pipeline which as the subject says the we are
>>> having issues with data lost.
>>>
>>> Using KafkaIO (2.0.4 due to the version of our brokers) with
>>> commitOnFinalize, we would like to understand how this finalize work
>>> together with a FileIO.
>>>
>>> I studied the KafkaIO and saw that the records are committed to kafka
>>> inside the consumerPollLoop method only when a checkpoint is produced, but
>>> when is this checkpoint produced?, how does it cope with windowed data and
>>> a FileIO to produces files?
>>>
>>> When running with spark our batchInterval is 30secs, and the pipeline
>>> have a fixed-window of 1hr for FileIO to write to HDFS and we are
>>> constantly restarting the pipeline (1 or 3 times a day, or yarn reach it
>>> maximum restart attempt and then it kill it completely due to networks
>>> interruption ), however we have detected we have missing data on HDFS.
>>>
>>> Initially we were running without specifying a checkpoint directory
>>> (SparkRunner) , and we found that on each deployment a random directory was
>>> generated under /tmp, recently we started to uses a fixed directory for
>>> checkpoint (via - - checkpointDir on the spark runner), but still we have
>>> doubts that this will completely solve our data lost problems when
>>> restarting the pipeline multiple times a day (or is it our assumption
>>> incorrect? ).
>>>
>>> Please advice if this usecase (data ingestion to hdfs) is something beam
>>> could achieve without lossing data from KafkaIO.
>>>
>>> Thanks
>>> JC
>>>
>>>
>>>
>>>

Re: Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Thanks for you inputs on this matter, by data loss i meant data that is on
Kafka but was not written in HDFS due to restarting the pipeline
(SparkRunner) or because it failed due to connectivity and was kill by Yarn
and when we restarted the pipeline those records were skipped.

I am curious about what you mentioned (30min of 1hr window would be lost),
just a noob question, why?

> read from any source and write on any supported sink.

I have no doubt about it.

With a multi stage pipeline (where we sort, manipulate, group the data) my
purpose is to reliable sink data to HDFS, regardless of any interruption on
the pipeline, like other ingestion library do in "batch" (Camus / gobbling
from LinkedIn)

During this period we are using HDFS as sink with Spark with a window to
avoid hitting HDFS badly, do you recommend me to use Flink instead (at
least for this requirement)?

Thanks in advanced

Raghu Angadi <ra...@google.com> schrieb am Di., 2. Okt. 2018, 08:25:

> This is mostly a question about SparkRunner and to certain extent FileIO.
> You might want to elaborate a bit more what you mean by data loss. In most
> cases, restarting a pipeline from scratch loses checkpointed state from
> previous job (e.g. first 30 minutes of a 1 hour window would be lost),
> unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
> starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).
>
> Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
> corresponding messages are processed/checkpointed. In the case of Spark and
> Dataflow, it would be after the messages pass through the first stage of
> the pipeline.
>
> > Please advice if this usecase (data ingestion to hdfs) is something beam
> could achieve without lossing data from KafkaIO.
> Yes, reading from any supported source and writing to any supported sink
> is supported. Otherwise, it would be a bug.
>
> On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Hi folks we are running a pipeline which as the subject says the we are
>> having issues with data lost.
>>
>> Using KafkaIO (2.0.4 due to the version of our brokers) with
>> commitOnFinalize, we would like to understand how this finalize work
>> together with a FileIO.
>>
>> I studied the KafkaIO and saw that the records are committed to kafka
>> inside the consumerPollLoop method only when a checkpoint is produced, but
>> when is this checkpoint produced?, how does it cope with windowed data and
>> a FileIO to produces files?
>>
>> When running with spark our batchInterval is 30secs, and the pipeline
>> have a fixed-window of 1hr for FileIO to write to HDFS and we are
>> constantly restarting the pipeline (1 or 3 times a day, or yarn reach it
>> maximum restart attempt and then it kill it completely due to networks
>> interruption ), however we have detected we have missing data on HDFS.
>>
>> Initially we were running without specifying a checkpoint directory
>> (SparkRunner) , and we found that on each deployment a random directory was
>> generated under /tmp, recently we started to uses a fixed directory for
>> checkpoint (via - - checkpointDir on the spark runner), but still we have
>> doubts that this will completely solve our data lost problems when
>> restarting the pipeline multiple times a day (or is it our assumption
>> incorrect? ).
>>
>> Please advice if this usecase (data ingestion to hdfs) is something beam
>> could achieve without lossing data from KafkaIO.
>>
>> Thanks
>> JC
>>
>>
>>
>>

Re: Ingesting from KafkaIO into HDFS, problems with checkpoint and data lost on top of yarn + spark

Posted by Raghu Angadi <ra...@google.com>.
This is mostly a question about SparkRunner and to certain extent FileIO.
You might want to elaborate a bit more what you mean by data loss. In most
cases, restarting a pipeline from scratch loses checkpointed state from
previous job (e.g. first 30 minutes of a 1 hour window would be lost),
unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).

Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
corresponding messages are processed/checkpointed. In the case of Spark and
Dataflow, it would be after the messages pass through the first stage of
the pipeline.

> Please advice if this usecase (data ingestion to hdfs) is something beam
could achieve without lossing data from KafkaIO.
Yes, reading from any supported source and writing to any supported sink is
supported. Otherwise, it would be a bug.

On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Hi folks we are running a pipeline which as the subject says the we are
> having issues with data lost.
>
> Using KafkaIO (2.0.4 due to the version of our brokers) with
> commitOnFinalize, we would like to understand how this finalize work
> together with a FileIO.
>
> I studied the KafkaIO and saw that the records are committed to kafka
> inside the consumerPollLoop method only when a checkpoint is produced, but
> when is this checkpoint produced?, how does it cope with windowed data and
> a FileIO to produces files?
>
> When running with spark our batchInterval is 30secs, and the pipeline have
> a fixed-window of 1hr for FileIO to write to HDFS and we are constantly
> restarting the pipeline (1 or 3 times a day, or yarn reach it maximum
> restart attempt and then it kill it completely due to networks interruption
> ), however we have detected we have missing data on HDFS.
>
> Initially we were running without specifying a checkpoint directory
> (SparkRunner) , and we found that on each deployment a random directory was
> generated under /tmp, recently we started to uses a fixed directory for
> checkpoint (via - - checkpointDir on the spark runner), but still we have
> doubts that this will completely solve our data lost problems when
> restarting the pipeline multiple times a day (or is it our assumption
> incorrect? ).
>
> Please advice if this usecase (data ingestion to hdfs) is something beam
> could achieve without lossing data from KafkaIO.
>
> Thanks
> JC
>
>
>
>