You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Eric Dain <er...@gmail.com> on 2017/01/28 21:07:05 UTC

"End of Batch" event

Hi,

I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest large
csv file. I need to run some clean-up code after all records in the file
are processed. Is there a way to send "End of Batch" event that is
guaranteed to be processed after all records? If not is there alternative
solution?

Thanks,
Eric

Re: "End of Batch" event

Posted by "Matthias J. Sax" <ma...@confluent.io>.
But you cant delete them from the local store like this... you need to
process tombstone to get them deleted from there. The idea about the
design is, to compute those tombstone an inject them into the source topics.

-Matthias

On 2/1/17 3:34 PM, Gwen Shapira wrote:
> I'm wondering why it has to be so complex... Kafka can be configured
> to delete items older than 24h in a topic. So if you want to get rid
> of records that did not arrive in the last 24h, just configure the
> topic accordingly?
> 
> On Wed, Feb 1, 2017 at 2:37 PM, Matthias J. Sax <ma...@confluent.io> wrote:
>> Understood now.
>>
>> It's a tricky problem you have, and the only solution I can come up with
>> is quite complex -- maybe anybody else has a better idea?
>> Honestly, I am not sure if this will work:
>>
>> For my proposal, the source ID must be part of the key of your records
>> to distinguish records from different sources.
>>
>> You can read all sources as a single KStream. Second, you will need two
>> KTables, one additional "side topic", and use IQ in your main() method
>> in combination with an additional producer that does write to both input
>> topic.
>>
>> 1. You put all new records into the first (auxiliary) KTable
>> 2. You apply all new record to update you second (result) KTable
>> 3. You query for result KTable to get all keys store in it
>> 4. You query your auxiliary KTable to get all keys stored in it
>> 5. You compute the key set of non-updated keys
>> 6. You write tombstone messages for those keys into the input topic,
>>    using the additional producer (this will delete all non-updated keys
>> from you result KTable)
>> 7. you write tombstones for all updated keys into your "side" topic,
>>    using the additional producer (this will clear you auxiliary KTable
>> in order to have it empty for the next batch of record to arrive)
>>
>> The "side topic" must be a second input topic for your auxiliary KTable.
>>
>> The dataflow would be like this:
>>
>> input topic ---+----------> result KTable
>>                |
>>                +---+
>>                    +---> auxiliary KTable
>> side topic --------+
>>
>>
>> You need to read both topics as KStream, duplicate the first KStream and
>> merge one duplicate with the side topic.
>>
>> Something like this:
>>
>> KStream input = builder.stream("source");
>> KStream side = builder.stream("side");
>> KStream merged = builder.merge(input, side);
>>
>> KTable result = input.groupByKey().aggregate(...);
>> KTable auxiliary = merged.groupedByKey().aggregate(...);
>>
>>
>> For doing the querying step, you need to monitor the apps progress when
>> in processes CSV input. If it is done processing, you start your
>> querying to compute the key sets for updated and non-updated keys, write
>> all tombstones, and than wait for the next batch of regular input to
>> arrive, and start over the whole process.
>>
>> Step 1 to 3 will be covered by you Streams app and this will run
>> continuously. Step 4 to 7 is additional user code that you need to put
>> outside the streaming part of you app, and trigger after processing a
>> batch of CSV inputs finished.
>>
>>
>> Hope this does make sense...
>>
>>
>> -Matthias
>>
>>
>> On 1/31/17 4:53 PM, Eric Dain wrote:
>>> Sorry for the confusion, I stopped the example before processing the file
>>> from S2.
>>>
>>> So in day 2, if we get
>>> S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]
>>>
>>> To elaborate more, A, B and C belong to S1 ( items have field to state
>>> their source). Processing files from S1 should never delete or modify items
>>> belong to S2.
>>>
>>> Thanks for the feedback that I should not use Interactive Queries in
>>> SourceTask.
>>>
>>> Currently, I'm representing all CSVs records in one KStream (adding source
>>> to each record). But I can represent them as separate KStreams if needed.
>>> Are you suggesting windowing these KStreams with 24 hours window and then
>>> merging them?
>>>
>>>
>>>
>>>
>>> On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Thanks for the update.
>>>>
>>>> What is not clear to me: why do you only need to remove C, but not
>>>> D,E,F, too, as source2 does not deliver any data on day 2?
>>>>
>>>> Furhtermore, IQ is designed to be use outside of you Streams code, and
>>>> thus, you should no use it in SourceTask (not sure if this would even be
>>>> possible in the first place).
>>>>
>>>> However, you might be able to exploit joins... Your CSV input is
>>>> KStream, right?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 1/31/17 3:10 PM, Eric Dain wrote:
>>>>> Sorry for not being clear. Let me explain by example. Let's say I have
>>>> two
>>>>> sources S1 and S2. The application that I need to write will load the
>>>> files
>>>>> from these sources every 24 hours. The results will be KTable K.
>>>>>
>>>>> For day 1:
>>>>> S1=[A, B, C]   =>  the result K = [A,B,C]
>>>>>
>>>>> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
>>>>>
>>>>> For day 2:
>>>>>
>>>>> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
>>>>> On the other hand, I will process A and B again in case of updates.
>>>>>
>>>>> In other words, I know how to process existent and new items, I'm not
>>>> sure
>>>>> how to remove items missing from the latest CSV file.
>>>>>
>>>>> If I can use Interactive Queries from inside the SourceTask to get a
>>>>> snapshot of what currently in K for a specific source S, then I can send
>>>>> delete message for the missing items by subtracting latest items in the
>>>> CSV
>>>>> from the items of that source in K.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I am not sure if I understand the complete scenario yet.
>>>>>>
>>>>>>> I need to delete all items from that source that
>>>>>>> doesn't exist in the latest CSV file.
>>>>>>
>>>>>> Cannot follow here. I thought your CSV files provide the data you want
>>>>>> to process. But it seems you also have a second source?
>>>>>>
>>>>>> How does your Streams app compute the items you want to delete? If you
>>>>>> have this items in a KTable, you can access them from outside your
>>>>>> application using Interactive Queries.
>>>>>>
>>>>>> Thus, you can monitor the app progress by observing committed offsets,
>>>>>> and if finished, you query your KTable to extract the items you want to
>>>>>> delete and do the cleanup.
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>> For Interactive Queries see the docs and blog post:
>>>>>>
>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>> guide.html#interactive-queries
>>>>>>
>>>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>>>>> and-interactive-queries-in-apache-kafka/
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>>>>>> Thanks Matthias for your reply.
>>>>>>>
>>>>>>> I'm not trying to stop the application. I'm importing inventory from
>>>> CSV
>>>>>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>>>>>> source's inventory. I need to delete all items from that source that
>>>>>>> doesn't exist in the latest CSV file.
>>>>>>>
>>>>>>> I was thinking of using "End of Batch" message to initiate that
>>>> process.
>>>>>> I
>>>>>>> might need to do the clean-up as part of the Connect code instead, or
>>>>>> there
>>>>>>> is a better way of doing that?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eric
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> currently, a Kafka Streams application is designed to "run forever"
>>>> and
>>>>>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>>>>>> though... (cf.
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>>>>>
>>>>>>>> Thus, right now you need to stop your application manually. You would
>>>>>>>> need to observe the application's committed offsets (and lag) using
>>>>>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
>>>> to
>>>>>>>> monitor the app's progress to see when it is done.
>>>>>>>>
>>>>>>>> Cf.
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>>>>>> large
>>>>>>>>> csv file. I need to run some clean-up code after all records in the
>>>>>> file
>>>>>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>>>>>> guaranteed to be processed after all records? If not is there
>>>>>> alternative
>>>>>>>>> solution?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Eric
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 
> 
> 


Re: "End of Batch" event

Posted by Gwen Shapira <gw...@confluent.io>.
I'm wondering why it has to be so complex... Kafka can be configured
to delete items older than 24h in a topic. So if you want to get rid
of records that did not arrive in the last 24h, just configure the
topic accordingly?

On Wed, Feb 1, 2017 at 2:37 PM, Matthias J. Sax <ma...@confluent.io> wrote:
> Understood now.
>
> It's a tricky problem you have, and the only solution I can come up with
> is quite complex -- maybe anybody else has a better idea?
> Honestly, I am not sure if this will work:
>
> For my proposal, the source ID must be part of the key of your records
> to distinguish records from different sources.
>
> You can read all sources as a single KStream. Second, you will need two
> KTables, one additional "side topic", and use IQ in your main() method
> in combination with an additional producer that does write to both input
> topic.
>
> 1. You put all new records into the first (auxiliary) KTable
> 2. You apply all new record to update you second (result) KTable
> 3. You query for result KTable to get all keys store in it
> 4. You query your auxiliary KTable to get all keys stored in it
> 5. You compute the key set of non-updated keys
> 6. You write tombstone messages for those keys into the input topic,
>    using the additional producer (this will delete all non-updated keys
> from you result KTable)
> 7. you write tombstones for all updated keys into your "side" topic,
>    using the additional producer (this will clear you auxiliary KTable
> in order to have it empty for the next batch of record to arrive)
>
> The "side topic" must be a second input topic for your auxiliary KTable.
>
> The dataflow would be like this:
>
> input topic ---+----------> result KTable
>                |
>                +---+
>                    +---> auxiliary KTable
> side topic --------+
>
>
> You need to read both topics as KStream, duplicate the first KStream and
> merge one duplicate with the side topic.
>
> Something like this:
>
> KStream input = builder.stream("source");
> KStream side = builder.stream("side");
> KStream merged = builder.merge(input, side);
>
> KTable result = input.groupByKey().aggregate(...);
> KTable auxiliary = merged.groupedByKey().aggregate(...);
>
>
> For doing the querying step, you need to monitor the apps progress when
> in processes CSV input. If it is done processing, you start your
> querying to compute the key sets for updated and non-updated keys, write
> all tombstones, and than wait for the next batch of regular input to
> arrive, and start over the whole process.
>
> Step 1 to 3 will be covered by you Streams app and this will run
> continuously. Step 4 to 7 is additional user code that you need to put
> outside the streaming part of you app, and trigger after processing a
> batch of CSV inputs finished.
>
>
> Hope this does make sense...
>
>
> -Matthias
>
>
> On 1/31/17 4:53 PM, Eric Dain wrote:
>> Sorry for the confusion, I stopped the example before processing the file
>> from S2.
>>
>> So in day 2, if we get
>> S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]
>>
>> To elaborate more, A, B and C belong to S1 ( items have field to state
>> their source). Processing files from S1 should never delete or modify items
>> belong to S2.
>>
>> Thanks for the feedback that I should not use Interactive Queries in
>> SourceTask.
>>
>> Currently, I'm representing all CSVs records in one KStream (adding source
>> to each record). But I can represent them as separate KStreams if needed.
>> Are you suggesting windowing these KStreams with 24 hours window and then
>> merging them?
>>
>>
>>
>>
>> On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> What is not clear to me: why do you only need to remove C, but not
>>> D,E,F, too, as source2 does not deliver any data on day 2?
>>>
>>> Furhtermore, IQ is designed to be use outside of you Streams code, and
>>> thus, you should no use it in SourceTask (not sure if this would even be
>>> possible in the first place).
>>>
>>> However, you might be able to exploit joins... Your CSV input is
>>> KStream, right?
>>>
>>>
>>> -Matthias
>>>
>>> On 1/31/17 3:10 PM, Eric Dain wrote:
>>>> Sorry for not being clear. Let me explain by example. Let's say I have
>>> two
>>>> sources S1 and S2. The application that I need to write will load the
>>> files
>>>> from these sources every 24 hours. The results will be KTable K.
>>>>
>>>> For day 1:
>>>> S1=[A, B, C]   =>  the result K = [A,B,C]
>>>>
>>>> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
>>>>
>>>> For day 2:
>>>>
>>>> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
>>>> On the other hand, I will process A and B again in case of updates.
>>>>
>>>> In other words, I know how to process existent and new items, I'm not
>>> sure
>>>> how to remove items missing from the latest CSV file.
>>>>
>>>> If I can use Interactive Queries from inside the SourceTask to get a
>>>> snapshot of what currently in K for a specific source S, then I can send
>>>> delete message for the missing items by subtracting latest items in the
>>> CSV
>>>> from the items of that source in K.
>>>>
>>>> Thanks,
>>>>
>>>> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>>
>>>>> I am not sure if I understand the complete scenario yet.
>>>>>
>>>>>> I need to delete all items from that source that
>>>>>> doesn't exist in the latest CSV file.
>>>>>
>>>>> Cannot follow here. I thought your CSV files provide the data you want
>>>>> to process. But it seems you also have a second source?
>>>>>
>>>>> How does your Streams app compute the items you want to delete? If you
>>>>> have this items in a KTable, you can access them from outside your
>>>>> application using Interactive Queries.
>>>>>
>>>>> Thus, you can monitor the app progress by observing committed offsets,
>>>>> and if finished, you query your KTable to extract the items you want to
>>>>> delete and do the cleanup.
>>>>>
>>>>> Does this make sense?
>>>>>
>>>>> For Interactive Queries see the docs and blog post:
>>>>>
>>>>> http://docs.confluent.io/current/streams/developer-
>>>>> guide.html#interactive-queries
>>>>>
>>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>>>> and-interactive-queries-in-apache-kafka/
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>>>>> Thanks Matthias for your reply.
>>>>>>
>>>>>> I'm not trying to stop the application. I'm importing inventory from
>>> CSV
>>>>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>>>>> source's inventory. I need to delete all items from that source that
>>>>>> doesn't exist in the latest CSV file.
>>>>>>
>>>>>> I was thinking of using "End of Batch" message to initiate that
>>> process.
>>>>> I
>>>>>> might need to do the clean-up as part of the Connect code instead, or
>>>>> there
>>>>>> is a better way of doing that?
>>>>>>
>>>>>> Thanks,
>>>>>> Eric
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> currently, a Kafka Streams application is designed to "run forever"
>>> and
>>>>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>>>>> though... (cf.
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>>>>
>>>>>>> Thus, right now you need to stop your application manually. You would
>>>>>>> need to observe the application's committed offsets (and lag) using
>>>>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
>>> to
>>>>>>> monitor the app's progress to see when it is done.
>>>>>>>
>>>>>>> Cf.
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>>>>> large
>>>>>>>> csv file. I need to run some clean-up code after all records in the
>>>>> file
>>>>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>>>>> guaranteed to be processed after all records? If not is there
>>>>> alternative
>>>>>>>> solution?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Eric
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Re: "End of Batch" event

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Understood now.

It's a tricky problem you have, and the only solution I can come up with
is quite complex -- maybe anybody else has a better idea?
Honestly, I am not sure if this will work:

For my proposal, the source ID must be part of the key of your records
to distinguish records from different sources.

You can read all sources as a single KStream. Second, you will need two
KTables, one additional "side topic", and use IQ in your main() method
in combination with an additional producer that does write to both input
topic.

1. You put all new records into the first (auxiliary) KTable
2. You apply all new record to update you second (result) KTable
3. You query for result KTable to get all keys store in it
4. You query your auxiliary KTable to get all keys stored in it
5. You compute the key set of non-updated keys
6. You write tombstone messages for those keys into the input topic,
   using the additional producer (this will delete all non-updated keys
from you result KTable)
7. you write tombstones for all updated keys into your "side" topic,
   using the additional producer (this will clear you auxiliary KTable
in order to have it empty for the next batch of record to arrive)

The "side topic" must be a second input topic for your auxiliary KTable.

The dataflow would be like this:

input topic ---+----------> result KTable
               |
               +---+
                   +---> auxiliary KTable
side topic --------+


You need to read both topics as KStream, duplicate the first KStream and
merge one duplicate with the side topic.

Something like this:

KStream input = builder.stream("source");
KStream side = builder.stream("side");
KStream merged = builder.merge(input, side);

KTable result = input.groupByKey().aggregate(...);
KTable auxiliary = merged.groupedByKey().aggregate(...);


For doing the querying step, you need to monitor the apps progress when
in processes CSV input. If it is done processing, you start your
querying to compute the key sets for updated and non-updated keys, write
all tombstones, and than wait for the next batch of regular input to
arrive, and start over the whole process.

Step 1 to 3 will be covered by you Streams app and this will run
continuously. Step 4 to 7 is additional user code that you need to put
outside the streaming part of you app, and trigger after processing a
batch of CSV inputs finished.


Hope this does make sense...


-Matthias


On 1/31/17 4:53 PM, Eric Dain wrote:
> Sorry for the confusion, I stopped the example before processing the file
> from S2.
> 
> So in day 2, if we get
> S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]
> 
> To elaborate more, A, B and C belong to S1 ( items have field to state
> their source). Processing files from S1 should never delete or modify items
> belong to S2.
> 
> Thanks for the feedback that I should not use Interactive Queries in
> SourceTask.
> 
> Currently, I'm representing all CSVs records in one KStream (adding source
> to each record). But I can represent them as separate KStreams if needed.
> Are you suggesting windowing these KStreams with 24 hours window and then
> merging them?
> 
> 
> 
> 
> On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for the update.
>>
>> What is not clear to me: why do you only need to remove C, but not
>> D,E,F, too, as source2 does not deliver any data on day 2?
>>
>> Furhtermore, IQ is designed to be use outside of you Streams code, and
>> thus, you should no use it in SourceTask (not sure if this would even be
>> possible in the first place).
>>
>> However, you might be able to exploit joins... Your CSV input is
>> KStream, right?
>>
>>
>> -Matthias
>>
>> On 1/31/17 3:10 PM, Eric Dain wrote:
>>> Sorry for not being clear. Let me explain by example. Let's say I have
>> two
>>> sources S1 and S2. The application that I need to write will load the
>> files
>>> from these sources every 24 hours. The results will be KTable K.
>>>
>>> For day 1:
>>> S1=[A, B, C]   =>  the result K = [A,B,C]
>>>
>>> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
>>>
>>> For day 2:
>>>
>>> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
>>> On the other hand, I will process A and B again in case of updates.
>>>
>>> In other words, I know how to process existent and new items, I'm not
>> sure
>>> how to remove items missing from the latest CSV file.
>>>
>>> If I can use Interactive Queries from inside the SourceTask to get a
>>> snapshot of what currently in K for a specific source S, then I can send
>>> delete message for the missing items by subtracting latest items in the
>> CSV
>>> from the items of that source in K.
>>>
>>> Thanks,
>>>
>>> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> I am not sure if I understand the complete scenario yet.
>>>>
>>>>> I need to delete all items from that source that
>>>>> doesn't exist in the latest CSV file.
>>>>
>>>> Cannot follow here. I thought your CSV files provide the data you want
>>>> to process. But it seems you also have a second source?
>>>>
>>>> How does your Streams app compute the items you want to delete? If you
>>>> have this items in a KTable, you can access them from outside your
>>>> application using Interactive Queries.
>>>>
>>>> Thus, you can monitor the app progress by observing committed offsets,
>>>> and if finished, you query your KTable to extract the items you want to
>>>> delete and do the cleanup.
>>>>
>>>> Does this make sense?
>>>>
>>>> For Interactive Queries see the docs and blog post:
>>>>
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#interactive-queries
>>>>
>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>>> and-interactive-queries-in-apache-kafka/
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>>>> Thanks Matthias for your reply.
>>>>>
>>>>> I'm not trying to stop the application. I'm importing inventory from
>> CSV
>>>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>>>> source's inventory. I need to delete all items from that source that
>>>>> doesn't exist in the latest CSV file.
>>>>>
>>>>> I was thinking of using "End of Batch" message to initiate that
>> process.
>>>> I
>>>>> might need to do the clean-up as part of the Connect code instead, or
>>>> there
>>>>> is a better way of doing that?
>>>>>
>>>>> Thanks,
>>>>> Eric
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> currently, a Kafka Streams application is designed to "run forever"
>> and
>>>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>>>> though... (cf.
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>>>
>>>>>> Thus, right now you need to stop your application manually. You would
>>>>>> need to observe the application's committed offsets (and lag) using
>>>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
>> to
>>>>>> monitor the app's progress to see when it is done.
>>>>>>
>>>>>> Cf.
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>>>> large
>>>>>>> csv file. I need to run some clean-up code after all records in the
>>>> file
>>>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>>>> guaranteed to be processed after all records? If not is there
>>>> alternative
>>>>>>> solution?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eric
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: "End of Batch" event

Posted by Eric Dain <er...@gmail.com>.
Sorry for the confusion, I stopped the example before processing the file
from S2.

So in day 2, if we get
S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]

To elaborate more, A, B and C belong to S1 ( items have field to state
their source). Processing files from S1 should never delete or modify items
belong to S2.

Thanks for the feedback that I should not use Interactive Queries in
SourceTask.

Currently, I'm representing all CSVs records in one KStream (adding source
to each record). But I can represent them as separate KStreams if needed.
Are you suggesting windowing these KStreams with 24 hours window and then
merging them?




On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the update.
>
> What is not clear to me: why do you only need to remove C, but not
> D,E,F, too, as source2 does not deliver any data on day 2?
>
> Furhtermore, IQ is designed to be use outside of you Streams code, and
> thus, you should no use it in SourceTask (not sure if this would even be
> possible in the first place).
>
> However, you might be able to exploit joins... Your CSV input is
> KStream, right?
>
>
> -Matthias
>
> On 1/31/17 3:10 PM, Eric Dain wrote:
> > Sorry for not being clear. Let me explain by example. Let's say I have
> two
> > sources S1 and S2. The application that I need to write will load the
> files
> > from these sources every 24 hours. The results will be KTable K.
> >
> > For day 1:
> > S1=[A, B, C]   =>  the result K = [A,B,C]
> >
> > S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
> >
> > For day 2:
> >
> > S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
> > On the other hand, I will process A and B again in case of updates.
> >
> > In other words, I know how to process existent and new items, I'm not
> sure
> > how to remove items missing from the latest CSV file.
> >
> > If I can use Interactive Queries from inside the SourceTask to get a
> > snapshot of what currently in K for a specific source S, then I can send
> > delete message for the missing items by subtracting latest items in the
> CSV
> > from the items of that source in K.
> >
> > Thanks,
> >
> > On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> I am not sure if I understand the complete scenario yet.
> >>
> >>> I need to delete all items from that source that
> >>> doesn't exist in the latest CSV file.
> >>
> >> Cannot follow here. I thought your CSV files provide the data you want
> >> to process. But it seems you also have a second source?
> >>
> >> How does your Streams app compute the items you want to delete? If you
> >> have this items in a KTable, you can access them from outside your
> >> application using Interactive Queries.
> >>
> >> Thus, you can monitor the app progress by observing committed offsets,
> >> and if finished, you query your KTable to extract the items you want to
> >> delete and do the cleanup.
> >>
> >> Does this make sense?
> >>
> >> For Interactive Queries see the docs and blog post:
> >>
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#interactive-queries
> >>
> >> https://www.confluent.io/blog/unifying-stream-processing-
> >> and-interactive-queries-in-apache-kafka/
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 1/30/17 9:10 PM, Eric Dain wrote:
> >>> Thanks Matthias for your reply.
> >>>
> >>> I'm not trying to stop the application. I'm importing inventory from
> CSV
> >>> files coming from 3rd party sources. The CSVs are snapshots for each
> >>> source's inventory. I need to delete all items from that source that
> >>> doesn't exist in the latest CSV file.
> >>>
> >>> I was thinking of using "End of Batch" message to initiate that
> process.
> >> I
> >>> might need to do the clean-up as part of the Connect code instead, or
> >> there
> >>> is a better way of doing that?
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>>
> >>>
> >>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> currently, a Kafka Streams application is designed to "run forever"
> and
> >>>> there is no notion of "End of Batch" -- we have plans to add this
> >>>> though... (cf.
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
> >>>>
> >>>> Thus, right now you need to stop your application manually. You would
> >>>> need to observe the application's committed offsets (and lag) using
> >>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
> to
> >>>> monitor the app's progress to see when it is done.
> >>>>
> >>>> Cf.
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 1/28/17 1:07 PM, Eric Dain wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
> >> large
> >>>>> csv file. I need to run some clean-up code after all records in the
> >> file
> >>>>> are processed. Is there a way to send "End of Batch" event that is
> >>>>> guaranteed to be processed after all records? If not is there
> >> alternative
> >>>>> solution?
> >>>>>
> >>>>> Thanks,
> >>>>> Eric
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: "End of Batch" event

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the update.

What is not clear to me: why do you only need to remove C, but not
D,E,F, too, as source2 does not deliver any data on day 2?

Furhtermore, IQ is designed to be use outside of you Streams code, and
thus, you should no use it in SourceTask (not sure if this would even be
possible in the first place).

However, you might be able to exploit joins... Your CSV input is
KStream, right?


-Matthias

On 1/31/17 3:10 PM, Eric Dain wrote:
> Sorry for not being clear. Let me explain by example. Let's say I have two
> sources S1 and S2. The application that I need to write will load the files
> from these sources every 24 hours. The results will be KTable K.
> 
> For day 1:
> S1=[A, B, C]   =>  the result K = [A,B,C]
> 
> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
> 
> For day 2:
> 
> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
> On the other hand, I will process A and B again in case of updates.
> 
> In other words, I know how to process existent and new items, I'm not sure
> how to remove items missing from the latest CSV file.
> 
> If I can use Interactive Queries from inside the SourceTask to get a
> snapshot of what currently in K for a specific source S, then I can send
> delete message for the missing items by subtracting latest items in the CSV
> from the items of that source in K.
> 
> Thanks,
> 
> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I am not sure if I understand the complete scenario yet.
>>
>>> I need to delete all items from that source that
>>> doesn't exist in the latest CSV file.
>>
>> Cannot follow here. I thought your CSV files provide the data you want
>> to process. But it seems you also have a second source?
>>
>> How does your Streams app compute the items you want to delete? If you
>> have this items in a KTable, you can access them from outside your
>> application using Interactive Queries.
>>
>> Thus, you can monitor the app progress by observing committed offsets,
>> and if finished, you query your KTable to extract the items you want to
>> delete and do the cleanup.
>>
>> Does this make sense?
>>
>> For Interactive Queries see the docs and blog post:
>>
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#interactive-queries
>>
>> https://www.confluent.io/blog/unifying-stream-processing-
>> and-interactive-queries-in-apache-kafka/
>>
>>
>>
>> -Matthias
>>
>>
>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>> Thanks Matthias for your reply.
>>>
>>> I'm not trying to stop the application. I'm importing inventory from CSV
>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>> source's inventory. I need to delete all items from that source that
>>> doesn't exist in the latest CSV file.
>>>
>>> I was thinking of using "End of Batch" message to initiate that process.
>> I
>>> might need to do the clean-up as part of the Connect code instead, or
>> there
>>> is a better way of doing that?
>>>
>>> Thanks,
>>> Eric
>>>
>>>
>>>
>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> currently, a Kafka Streams application is designed to "run forever" and
>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>> though... (cf.
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>
>>>> Thus, right now you need to stop your application manually. You would
>>>> need to observe the application's committed offsets (and lag) using
>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
>>>> monitor the app's progress to see when it is done.
>>>>
>>>> Cf.
>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>> Hi,
>>>>>
>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>> large
>>>>> csv file. I need to run some clean-up code after all records in the
>> file
>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>> guaranteed to be processed after all records? If not is there
>> alternative
>>>>> solution?
>>>>>
>>>>> Thanks,
>>>>> Eric
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: "End of Batch" event

Posted by Eric Dain <er...@gmail.com>.
Sorry for not being clear. Let me explain by example. Let's say I have two
sources S1 and S2. The application that I need to write will load the files
from these sources every 24 hours. The results will be KTable K.

For day 1:
S1=[A, B, C]   =>  the result K = [A,B,C]

S2=[D,E,F] =>   K will be [A,B,C,D,E,F]

For day 2:

S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
On the other hand, I will process A and B again in case of updates.

In other words, I know how to process existent and new items, I'm not sure
how to remove items missing from the latest CSV file.

If I can use Interactive Queries from inside the SourceTask to get a
snapshot of what currently in K for a specific source S, then I can send
delete message for the missing items by subtracting latest items in the CSV
from the items of that source in K.

Thanks,

On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I am not sure if I understand the complete scenario yet.
>
> > I need to delete all items from that source that
> > doesn't exist in the latest CSV file.
>
> Cannot follow here. I thought your CSV files provide the data you want
> to process. But it seems you also have a second source?
>
> How does your Streams app compute the items you want to delete? If you
> have this items in a KTable, you can access them from outside your
> application using Interactive Queries.
>
> Thus, you can monitor the app progress by observing committed offsets,
> and if finished, you query your KTable to extract the items you want to
> delete and do the cleanup.
>
> Does this make sense?
>
> For Interactive Queries see the docs and blog post:
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#interactive-queries
>
> https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
>
>
>
> -Matthias
>
>
> On 1/30/17 9:10 PM, Eric Dain wrote:
> > Thanks Matthias for your reply.
> >
> > I'm not trying to stop the application. I'm importing inventory from CSV
> > files coming from 3rd party sources. The CSVs are snapshots for each
> > source's inventory. I need to delete all items from that source that
> > doesn't exist in the latest CSV file.
> >
> > I was thinking of using "End of Batch" message to initiate that process.
> I
> > might need to do the clean-up as part of the Connect code instead, or
> there
> > is a better way of doing that?
> >
> > Thanks,
> > Eric
> >
> >
> >
> > On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Hi,
> >>
> >> currently, a Kafka Streams application is designed to "run forever" and
> >> there is no notion of "End of Batch" -- we have plans to add this
> >> though... (cf.
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
> >>
> >> Thus, right now you need to stop your application manually. You would
> >> need to observe the application's committed offsets (and lag) using
> >> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
> >> monitor the app's progress to see when it is done.
> >>
> >> Cf.
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Data+%28Re%29Processing+Scenarios
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 1/28/17 1:07 PM, Eric Dain wrote:
> >>> Hi,
> >>>
> >>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
> large
> >>> csv file. I need to run some clean-up code after all records in the
> file
> >>> are processed. Is there a way to send "End of Batch" event that is
> >>> guaranteed to be processed after all records? If not is there
> alternative
> >>> solution?
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>
> >>
> >
>
>

Re: "End of Batch" event

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I am not sure if I understand the complete scenario yet.

> I need to delete all items from that source that
> doesn't exist in the latest CSV file.

Cannot follow here. I thought your CSV files provide the data you want
to process. But it seems you also have a second source?

How does your Streams app compute the items you want to delete? If you
have this items in a KTable, you can access them from outside your
application using Interactive Queries.

Thus, you can monitor the app progress by observing committed offsets,
and if finished, you query your KTable to extract the items you want to
delete and do the cleanup.

Does this make sense?

For Interactive Queries see the docs and blog post:

http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/



-Matthias


On 1/30/17 9:10 PM, Eric Dain wrote:
> Thanks Matthias for your reply.
> 
> I'm not trying to stop the application. I'm importing inventory from CSV
> files coming from 3rd party sources. The CSVs are snapshots for each
> source's inventory. I need to delete all items from that source that
> doesn't exist in the latest CSV file.
> 
> I was thinking of using "End of Batch" message to initiate that process. I
> might need to do the clean-up as part of the Connect code instead, or there
> is a better way of doing that?
> 
> Thanks,
> Eric
> 
> 
> 
> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Hi,
>>
>> currently, a Kafka Streams application is designed to "run forever" and
>> there is no notion of "End of Batch" -- we have plans to add this
>> though... (cf.
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>
>> Thus, right now you need to stop your application manually. You would
>> need to observe the application's committed offsets (and lag) using
>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
>> monitor the app's progress to see when it is done.
>>
>> Cf.
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>
>>
>> -Matthias
>>
>>
>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>> Hi,
>>>
>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest large
>>> csv file. I need to run some clean-up code after all records in the file
>>> are processed. Is there a way to send "End of Batch" event that is
>>> guaranteed to be processed after all records? If not is there alternative
>>> solution?
>>>
>>> Thanks,
>>> Eric
>>>
>>
>>
> 


Re: "End of Batch" event

Posted by Eric Dain <er...@gmail.com>.
Thanks Matthias for your reply.

I'm not trying to stop the application. I'm importing inventory from CSV
files coming from 3rd party sources. The CSVs are snapshots for each
source's inventory. I need to delete all items from that source that
doesn't exist in the latest CSV file.

I was thinking of using "End of Batch" message to initiate that process. I
might need to do the clean-up as part of the Connect code instead, or there
is a better way of doing that?

Thanks,
Eric



On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi,
>
> currently, a Kafka Streams application is designed to "run forever" and
> there is no notion of "End of Batch" -- we have plans to add this
> though... (cf.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>
> Thus, right now you need to stop your application manually. You would
> need to observe the application's committed offsets (and lag) using
> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
> monitor the app's progress to see when it is done.
>
> Cf.
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>
>
> -Matthias
>
>
> On 1/28/17 1:07 PM, Eric Dain wrote:
> > Hi,
> >
> > I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest large
> > csv file. I need to run some clean-up code after all records in the file
> > are processed. Is there a way to send "End of Batch" event that is
> > guaranteed to be processed after all records? If not is there alternative
> > solution?
> >
> > Thanks,
> > Eric
> >
>
>

Re: "End of Batch" event

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

currently, a Kafka Streams application is designed to "run forever" and
there is no notion of "End of Batch" -- we have plans to add this
though... (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams)

Thus, right now you need to stop your application manually. You would
need to observe the application's committed offsets (and lag) using
bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
monitor the app's progress to see when it is done.

Cf.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Data+%28Re%29Processing+Scenarios


-Matthias


On 1/28/17 1:07 PM, Eric Dain wrote:
> Hi,
> 
> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest large
> csv file. I need to run some clean-up code after all records in the file
> are processed. Is there a way to send "End of Batch" event that is
> guaranteed to be processed after all records? If not is there alternative
> solution?
> 
> Thanks,
> Eric
>