You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prabhu V <vp...@gmail.com> on 2016/08/03 12:07:33 UTC

Kakfa batches

I understand flink does steaming, but i feel my requirement is more batch
oriented.

Read froma kafka cluster,
Do a little data massaging
Bucket data into hadoop files that are atleast one hdfs block in size.
Our environment is Yarn and kerberized (kafka and hadoop, i am currently
allowed pass the keytab to the containers)
Signal down stream processing based on timestamps in data and kafka offsets
The process has to run forever

I used a flink streaming job with checkpoints, with windowing (custom
trigger based on both a count of events or time inspired by
https://gist.github.com/shikhar/2cb9f1b792be31b7c16e/9f08f8964b3f177fe48f6f8fc3916932fdfdcc71
)


Obeservations with Streaming.

1) Long running kerberos fails in 7 days (the data that is held in the
window buffer is lost and restart results in event loss)
2) I hold on to the resouces/container in the cluster irrespective of
volume of events for all time
3) The comitted offsets in kafka does not reflect the last written offsets
in hdfs (kafka offsets may be commited/checkpointed while the window is yet
to trigger)
4) Windowing is similar to batch in a way, the files are not
available/rolled till the file is closed

Is there a way the kafkaconnector can take a start and stop values for
offsets that would be ideal for my scenario. The design in this scenario
would be to

1) run a scheduled job that calculates offset ranges to be consumed
2) The container number and size would be based off the number of messages
consumed
3) commit the offsets after job is successful

Please let me know if there is a better way.

Thanks,
Prabhu

Re: Kakfa batches

Posted by Prabhu V <vp...@gmail.com>.
Thanks for the reply.

Yeah I mean a manual commit in #3, this is because in this case the offsets
would accurately reflect the number of messages processed.

My understanding is that the current checkpointing process commits the
state of all the operators separately, the kafka connector will commit
offset X while the window operator will checkpoint the messages with offset
range  (X - N upto X). If the job fails now (yarn application failed due to
some network issue, someone accidentally killed my yarn job etc) the
restarting the job will start processing from offset X and messages that
were checkpointed by the window operator are lost.

The reason we need the accuracy is because the down stream processes are
batch oriented (typically process a 15 min bucket of data) and are
triggered based on message timestamps exceeding a certain watermark. We
have a separate store that maintains the relation between partition-offset
and message-timestamp (these are ever increasing values). A check happens
against this store to see if the offsets processed by flink job has crossed
a certain timestamp.



On Wed, Aug 3, 2016 at 6:19 AM, Ufuk Celebi <uc...@apache.org> wrote:

> On Wed, Aug 3, 2016 at 2:07 PM, Prabhu V <vp...@gmail.com> wrote:
> > Obeservations with Streaming.
> >
> > 1) Long running kerberos fails in 7 days (the data that is held in the
> > window buffer is lost and restart results in event loss)
>
> This is a known issue I think. Looping in Max who knows the details I
> think.
>
> > 2) I hold on to the resouces/container in the cluster irrespective of
> volume
> > of events for all time
>
> Correct. There are plans for Flink 1.2 to make this dynamic.
>
> > Is there a way the kafkaconnector can take a start and stop values for
> > offsets that would be ideal for my scenario. The design in this scenario
> > would be to...
>
> This is not possible at the moment. What do you mean with "3) commit
> the offsets after job is successful"? Do you want to manually do this?
>

Re: Kakfa batches

Posted by Stephan Ewen <se...@apache.org>.
There is a pull request for Kerberos Keytab-based authentication. That way,
streaming jobs can run longer than 7 days.

https://github.com/apache/flink/pull/2275

On Wed, Aug 3, 2016 at 3:19 PM, Ufuk Celebi <uc...@apache.org> wrote:

> On Wed, Aug 3, 2016 at 2:07 PM, Prabhu V <vp...@gmail.com> wrote:
> > Obeservations with Streaming.
> >
> > 1) Long running kerberos fails in 7 days (the data that is held in the
> > window buffer is lost and restart results in event loss)
>
> This is a known issue I think. Looping in Max who knows the details I
> think.
>
> > 2) I hold on to the resouces/container in the cluster irrespective of
> volume
> > of events for all time
>
> Correct. There are plans for Flink 1.2 to make this dynamic.
>
> > Is there a way the kafkaconnector can take a start and stop values for
> > offsets that would be ideal for my scenario. The design in this scenario
> > would be to...
>
> This is not possible at the moment. What do you mean with "3) commit
> the offsets after job is successful"? Do you want to manually do this?
>

Re: Kakfa batches

Posted by Ufuk Celebi <uc...@apache.org>.
On Wed, Aug 3, 2016 at 2:07 PM, Prabhu V <vp...@gmail.com> wrote:
> Obeservations with Streaming.
>
> 1) Long running kerberos fails in 7 days (the data that is held in the
> window buffer is lost and restart results in event loss)

This is a known issue I think. Looping in Max who knows the details I think.

> 2) I hold on to the resouces/container in the cluster irrespective of volume
> of events for all time

Correct. There are plans for Flink 1.2 to make this dynamic.

> Is there a way the kafkaconnector can take a start and stop values for
> offsets that would be ideal for my scenario. The design in this scenario
> would be to...

This is not possible at the moment. What do you mean with "3) commit
the offsets after job is successful"? Do you want to manually do this?