You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jan Lukavský <je...@seznam.cz> on 2017/02/07 11:46:04 UTC

Correct prefetching of data to KTable-like structure on application startup

Hi all,

I have a question how to do a correct caching in KTable-like structure 
on application startup. I'm not sure if this belongs to user or dev 
maillist, so sorry if I've chosen the bad one. What is my observation so 
far:

  - if I don't send any data to a kafka partition for a period longer 
then the data retention interval, then all data from the partition is 
wiped out

  - the index file is not cleared (which is obvious, it has to keep 
track of the next offset to assign to a new message)

In my scenario on startup, I want to read all data from a topic (or a 
subset of its partitions), wait until all the old data has been cached 
and then start processing of a different stream (basically I'm doing a 
join of KStream and KTable, but I have implemented it manually due to 
some special behavior). Now, what is the issue here - when the specific 
partition doesn't get any message within the retention period, then I 
end up stuck trying to prefetch data to the "KTable" - this is because I 
get the offset of the last message (plus 1) from the broker, but I don't 
get any data ever (until I send a message to the partition). The problem 
I see here is that kafka tells me what the last offset in a partition 
is, but there is no upper bound on when a first message will arrive, 
even though I reset the offset and start reading from the beginning of a 
partition. My question is, is it a possibility not to clear the whole 
partition, but to always keep at least the last message? That way, the 
client would always get at least the last message, can therefore figure 
out it is at the end of the partition (reading the old data) and start 
processing. I believe that KTable implementation could have a very 
similar issue. Or is there any other way around? I could add a timeout, 
but this seems a little fragile.

Thanks in advance for any suggestions and opinions,

  Jan


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Matthias,
yes, that's exactly what I was looking for. I wasn't aware of the 
possibility to get the starting offset of a partition. My bad, thanks a lot.
Cheers,
 Jan


---------- Původní zpráva ----------
Od: Matthias J. Sax <ma...@confluent.io>
Komu: dev@kafka.apache.org
Datum: 15. 2. 2017 2:57:54
Předmět: Re: Correct prefetching of data to KTable-like structure on 
application startup

"Jan, 

If I understand you problem correctly, you do something like this on 
startup (I simplify to single partition) 

endOffset = consumer.endOffset(...) 

while (!done) { 
for (ConsumerRecord r : consumer.poll()) { 
// do some processing 
if (r.offset == endOffset) { 
done = true; 
break; 
} 
} 
} 

If your partitions is empty, poll() never returns anything and thus you 
loop forever. 

However, to solve this problem, you can simple check the "start offset" 
of the partitions before the loop. If start and end offset are the same, 
the partitions is empty and you never call poll. 

startOffset = consumer.beginningOffset(...) 
endOffset = consumer.endOffset(...) 

if(startOffset < endOffset) { 
while (!done) { 
for (ConsumerRecord r : consumer.poll()) { 
// do some processing 
if (r.offset == endOffset) { 
done = true; 
break; 
} 
} 
} 
} 


Does this help? 


-Matthias 

On 2/14/17 12:31 AM, Jan Lukavský wrote: 
> Hi Matthias, 
> 
> I understand that the local cache will not be automatically cleared, but 
> that is not an issue for me now. 
> 
> The problem I see is still the same as at the beginning - even caching 
> data to RocksDB in KafkaStreams implementation might (I would say) 
> suffer from this issue. When using time based data retention (for 
> whatever reason, maybe in combination with the log compation, but the 
> the issue is there irrespective to whether the log compation is used or 
> not), it is possible that some partition will report nonzero "next" 
> offset, but will not be able to deliver any message to the KafkaConsumer 
> (because the partition is emptied by the data retention) and therefore 
> the consumer will not be able to finish the materialization of the topic 
> to local store (either RocksDB or any other cache) and therefore will 
> not be able to start processing the KStream. If I understand the problem 
> right, then using timestamp will not help either, because there must be 
> some sort of vector clock with a time dimension for each input 
> partition, and the empty partition will not be able to move the 
> timestamp any further, and therefore the whole system will remain 
> blocked at timestamp 0, because the vector clock usually calculates 
> minimum from all time dimensions. 
> 
> Does that make any sense, or I am doing something fundamentally wrong? :) 
> 
> Thanks again for any thoughts, 
> 
> Jan 
> 
> 
> On 02/13/2017 06:37 PM, Matthias J. Sax wrote: 
>> Jan, 
>> 
>> brokers with version 0.10.1 or higher allow to set both topic cleanup 
>> policies in combination: 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+
compaction+and+deletion+to+co-exist 
>> 
>> 
>> However, this will only delete data in you changelog topic but not in 
>> your RocksDB -- if you want to get data delete in RocksDB, you would 
>> need to send tombstone messages for those keys. It's kinda tricky to get 
>> this done. 
>> 
>> An "brute force" alternative would be, stop the application, delete the 
>> local state directory, and restart. This will force Streams to recreate 
>> the RocksDB files from the changelog and thus only loading keys that got 
>> not deleted. But this is of course a quite expensive approach and you 
>> should be very careful about using it. 
>> 
>> 
>> -Matthias 
>> 
>> 
>> On 2/13/17 12:25 AM, Jan Lukavský wrote: 
>>> Hi Michael, 
>>> 
>>> sorry for my late answer. Configuring the topic as you suggest is one 
>>> option (and I will configure it that way), but I wanted to combine the 
>>> two data retention mechanisms (if possible). I would like to use log 
>>> compaction, so that I will always get at least the last message for 
>>> given key, but I would also like to use the classical temporal data 
>>> retention, which would function as a sort of TTL for the keys - if a key

>>> doesn't get an update for the configured period of time, if could be 
>>> removed. That way I could ensure that out-dated keys could be removed. 
>>> 
>>> Is there any other option for this? And can kafka be configured this 
>>> way? 
>>> 
>>> Best, 
>>> 
>>> Jan 
>>> 
>>> On 02/09/2017 12:08 PM, Michael Noll wrote: 
>>>> Jan, 
>>>> 
>>>>> - if I don't send any data to a kafka partition for a period longer 
>>>>> then 
>>>> the data retention interval, then all data from the partition is wiped 
>>>> out 
>>>> 
>>>> If I interpret your first and second message in this email thread 
>>>> correctly, then you are talking only about your "state topic" here, 
>>>> i.e. 
>>>> the topic that you read into a KTable. You should configure this 
>>>> topic to 
>>>> use log compaction, which will ensure that the latest value for a 
>>>> given key 
>>>> will never be wiped. So even if you don't send any data to a Kafka 
>>>> partition of this (now log-compacted) "state topic" for a long 
>>>> period of 
>>>> time, you'd always have access to (at least) the latest value for 
>>>> every key. 
>>>> 
>>>> Would that help? 
>>>> 
>>>> -Michael 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote: 
>>>> 
>>>>> Hi Matthias, 
>>>>> 
>>>>> first of all, thanks for your answer. Sorry if I didn't explain the 
>>>>> problem well, I didn't want to dig too much into detail to focus on 
>>>>> the 
>>>>> important and maybe the result was not clear. 
>>>>> 
>>>>> My fault, I will try to explain in again. I have two KafkaConsumers 
>>>>> in two 
>>>>> separate threads consuming from two topics - let's call the first one 
>>>>> "stream topic" (processed like KStream) 
>>>>> 
>>>>> and the second one "state topic" (processed like KTable). The state 
>>>>> topic 
>>>>> carries a persistent data that I need in order to process the stream 
>>>>> topic, 
>>>>> so I need to cache the state topic 
>>>>> 
>>>>> locally before starting consumption of the stream topic. When the 
>>>>> application is running normally, there seems to be no issue with this,

>>>>> 
>>>>> because the state topic is updated asynchronously and I use internal 
>>>>> locks 
>>>>> to synchronize the processing inside the application. So far, 
>>>>> everything is 
>>>>> fine. 
>>>>> 
>>>>> 
>>>>> The problem might arise when the application starts - then I do the 
>>>>> following: 
>>>>> 
>>>>> - lock processing of the stream topic (because I don't have the 
>>>>> state 
>>>>> topic cached) 
>>>>> 
>>>>> - read the current offset N from the state topic (which gives me 
>>>>> offsets 
>>>>> of a message that should be expected next, that is message that has 
>>>>> not yet 
>>>>> been written) 
>>>>> 
>>>>> - reset offset of the state topic to beginning and read it until I 
>>>>> read 
>>>>> offset N - 1, which tells me that I have cached all the data I need to

>>>>> process the stream topic, so I unlock the stream processing and 
>>>>> continue 
>>>>> 
>>>>> All this works well, except for some very rare situation, when the 
>>>>> following happens (as I understand it, maybe here I am making some 
>>>>> mistake): 
>>>>> 
>>>>> - for a long period of time there is no update to (at least single 
>>>>> partition) of the state topic 
>>>>> 
>>>>> - when I try to cache the state topic during startup as explained 
>>>>> above, 
>>>>> it might never finish, because I will never get a message with offset 
>>>>> N - 1 
>>>>> - that is because I will not get any message at all, because all of 
>>>>> the 
>>>>> data has been wiped out 
>>>>> 
>>>>> - because I don't know if I get all the data from the state 
>>>>> topic, I 
>>>>> cannot start processing the stream topic and the whole application is 
>>>>> stuck, until first message arrives into all partition of the state 
>>>>> topic 
>>>>> (which might even never happen) 
>>>>> 
>>>>> - I might use some sort of timeout to handle this, but this 
>>>>> could be 
>>>>> dangerous, relying on KafkaConsumer.poll() returning empty records 
>>>>> sounds 
>>>>> to me a little fragile too (because this might also indicate that no 
>>>>> records could have been fetched within the timeout, am I right?), what

>>>>> would totally solve my issue would be that during data retention, the 
>>>>> last 
>>>>> message would always be kept, and therefore I will always get the 
>>>>> message 
>>>>> with offset N - 1, and the whole issue would vanish. 
>>>>> 
>>>>> The situation when a partition on the state topic gets no updates 
>>>>> during 
>>>>> long time happens mostly in development environment (where there is 
>>>>> little 
>>>>> to no traffic), but I sense that this could be an issue in production 
>>>>> too, 
>>>>> for example due to some repartitioning of topics. 
>>>>> 
>>>>> Does that make any sense to you now? 
>>>>> 
>>>>> Thanks again for your response, 
>>>>> 
>>>>> Jan 
>>>>> 
>>>>> 
>>>>> 
>>>>> On 02/09/2017 08:00 AM, Matthias J. Sax wrote: 
>>>>> 
>>>>>> Jan, 
>>>>>> 
>>>>>> you scenario is quite complex and I am not sure if I understood every

>>>>>> part of it. I try to break it down: 
>>>>>> 
>>>>>> In my scenario on startup, I want to read all data from a topic (or a

>>>>>>> subset of its partitions), 
>>>>>>> wait until all the old data has been cached and then start 
>>>>>>> processing of 
>>>>>>> a different stream 
>>>>>>> 
>>>>>> That is hard to accomplish in general. Kafka Streams internally uses 
>>>>>> KafkaConsumer (one instance per StreamThread) and thus, does rely on 
>>>>>> the 
>>>>>> consumer's behavior with regard to poll(). Hence, Streams cannot 
>>>>>> control 
>>>>>> in detail, what data will be fetched from the brokers. 
>>>>>> 
>>>>>> Furthermore, Streams follow its own internal strategy to pick a 
>>>>>> record 
>>>>>> (from the available ones returned from poll()) and you cannot 
>>>>>> control in 
>>>>>> your code (at least not directly) what record will be picked. 
>>>>>> 
>>>>>> Basically, Streams tried to process records in "timestamp order", ie,

>>>>>> based an the timestamp returned from TimestampExtractor. So you can 
>>>>>> "influence" the processing order by record timestamps (as far as you 
>>>>>> can 
>>>>>> influence them) and/or by providing a custom TimestampExtractor. 
>>>>>> 
>>>>>> In your example, you might want the records you want to process first

>>>>>> (KTable), to have smaller timestamps (ie, be earlier) than the 
>>>>>> records 
>>>>>> from your KStream. But even this will only give you "best effort" 
>>>>>> behavior, and it can happen that a KStream record is processed before

>>>>>> all KTable records to processed. It's a know issues but hard to 
>>>>>> resolve. 
>>>>>> 
>>>>>> when the specific partition doesn't get any message within the 
>>>>>> retention 
>>>>>>> period, 
>>>>>>> then I end up stuck trying to prefetch data to the "KTable" - 
>>>>>>> this is 
>>>>>>> because I get 
>>>>>>> the offset of the last message (plus 1) from the broker, but I 
>>>>>>> don't get 
>>>>>>> any data 
>>>>>>> ever (until I send a message to the partition) 
>>>>>>> 
>>>>>> Cannot follow here: if there is no data, than you can of course not 
>>>>>> process any data -- so why do you end up being stuck? 
>>>>>> 
>>>>>> The problem I see here is that kafka tells me what the last offset 
>>>>>> in a 
>>>>>>> partition is, 
>>>>>>> but there is no upper bound on when a first message will arrive, 
>>>>>>> 
>>>>>> In general, the latency between data append at the broker and data 
>>>>>> receive at a consumer is rather small. So even if there is 
>>>>>> strictly no 
>>>>>> upper bound until a message gets delivered, this should not be an 
>>>>>> issue 
>>>>>> in practice. Or do I miss understand something? 
>>>>>> 
>>>>>> even though I reset the offset and start reading from the beginning 
>>>>>> of a 
>>>>>>> partition. 
>>>>>>> 
>>>>>> How does this relate? Cannot follow. 
>>>>>> 
>>>>>> My question is, is it a possibility not to clear the whole 
>>>>>> partition, but 
>>>>>>> to always keep at least the last message? 
>>>>>>> 
>>>>>> Not with regular retention policy -- not sure if log compaction can 
>>>>>> help 
>>>>>> here. 
>>>>>> 
>>>>>> That way, the client would always get at least the last message, can 
>>>>>>> therefore figure out 
>>>>>>> it is at the end of the partition (reading the old data) and start 
>>>>>>> processing. 
>>>>>>> 
>>>>>> Why is this required? If the client's offset is the same as 
>>>>>> "endOfLog" 
>>>>>> for each partition, you can figure out that there is nothing to 
>>>>>> read. So 
>>>>>> why would you need the last old message to figure this out? 
>>>>>> 
>>>>>> 
>>>>>> -Matthias 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 2/7/17 3:46 AM, Jan Lukavský wrote: 
>>>>>> 
>>>>>>> Hi all, 
>>>>>>> 
>>>>>>> I have a question how to do a correct caching in KTable-like 
>>>>>>> structure 
>>>>>>> on application startup. I'm not sure if this belongs to user or dev 
>>>>>>> maillist, so sorry if I've chosen the bad one. What is my 
>>>>>>> observation so 
>>>>>>> far: 
>>>>>>> 
>>>>>>> - if I don't send any data to a kafka partition for a period 
>>>>>>> longer 
>>>>>>> then the data retention interval, then all data from the 
>>>>>>> partition is 
>>>>>>> wiped out 
>>>>>>> 
>>>>>>> - the index file is not cleared (which is obvious, it has to 
>>>>>>> keep track 
>>>>>>> of the next offset to assign to a new message) 
>>>>>>> 
>>>>>>> In my scenario on startup, I want to read all data from a topic 
>>>>>>> (or a 
>>>>>>> subset of its partitions), wait until all the old data has been 
>>>>>>> cached 
>>>>>>> and then start processing of a different stream (basically I'm 
>>>>>>> doing a 
>>>>>>> join of KStream and KTable, but I have implemented it manually 
>>>>>>> due to 
>>>>>>> some special behavior). Now, what is the issue here - when the 
>>>>>>> specific 
>>>>>>> partition doesn't get any message within the retention period, 
>>>>>>> then I 
>>>>>>> end up stuck trying to prefetch data to the "KTable" - this is 
>>>>>>> because I 
>>>>>>> get the offset of the last message (plus 1) from the broker, but I 
>>>>>>> don't 
>>>>>>> get any data ever (until I send a message to the partition). The 
>>>>>>> problem 
>>>>>>> I see here is that kafka tells me what the last offset in a 
>>>>>>> partition 
>>>>>>> is, but there is no upper bound on when a first message will arrive,

>>>>>>> even though I reset the offset and start reading from the beginning 
>>>>>>> of a 
>>>>>>> partition. My question is, is it a possibility not to clear the 
>>>>>>> whole 
>>>>>>> partition, but to always keep at least the last message? That 
>>>>>>> way, the 
>>>>>>> client would always get at least the last message, can therefore 
>>>>>>> figure 
>>>>>>> out it is at the end of the partition (reading the old data) and 
>>>>>>> start 
>>>>>>> processing. I believe that KTable implementation could have a very 
>>>>>>> similar issue. Or is there any other way around? I could add a 
>>>>>>> timeout, 
>>>>>>> but this seems a little fragile. 
>>>>>>> 
>>>>>>> Thanks in advance for any suggestions and opinions, 
>>>>>>> 
>>>>>>> Jan 
>>>>>>> 
>>>>>>> 
> 

"

Re: Correct prefetching of data to KTable-like structure on application startup

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Jan,

If I understand you problem correctly, you do something like this on
startup (I simplify to single partition)

endOffset = consumer.endOffset(...)

while (!done) {
  for (ConsumerRecord r : consumer.poll()) {
    // do some processing
    if (r.offset == endOffset) {
      done = true;
      break;
    }
  }
}

If your partitions is empty, poll() never returns anything and thus you
loop forever.

However, to solve this problem, you can simple check the "start offset"
of the partitions before the loop. If start and end offset are the same,
the partitions is empty and you never call poll.

startOffset = consumer.beginningOffset(...)
endOffset = consumer.endOffset(...)

if(startOffset < endOffset) {
  while (!done) {
    for (ConsumerRecord r : consumer.poll()) {
      // do some processing
      if (r.offset == endOffset) {
        done = true;
        break;
      }
    }
  }
}


Does this help?


-Matthias

On 2/14/17 12:31 AM, Jan Lukavský wrote:
> Hi Matthias,
> 
> I understand that the local cache will not be automatically cleared, but
> that is not an issue for me now.
> 
> The problem I see is still the same as at the beginning - even caching
> data to RocksDB in KafkaStreams implementation might (I would say)
> suffer from this issue. When using time based data retention (for
> whatever reason, maybe in combination with the log compation, but the
> the issue is there irrespective to whether the log compation is used or
> not), it is possible that some partition will report nonzero "next"
> offset, but will not be able to deliver any message to the KafkaConsumer
> (because the partition is emptied by the data retention) and therefore
> the consumer will not be able to finish the materialization of the topic
> to local store (either RocksDB or any other cache) and therefore will
> not be able to start processing the KStream. If I understand the problem
> right, then using timestamp will not help either, because there must be
> some sort of vector clock with a time dimension for each input
> partition, and the empty partition will not be able to move the
> timestamp any further, and therefore the whole system will remain
> blocked at timestamp 0, because the vector clock usually calculates
> minimum from all time dimensions.
> 
> Does that make any sense, or I am doing something fundamentally wrong? :)
> 
> Thanks again for any thoughts,
> 
>  Jan
> 
> 
> On 02/13/2017 06:37 PM, Matthias J. Sax wrote:
>> Jan,
>>
>> brokers with version 0.10.1 or higher allow to set both topic cleanup
>> policies in combination:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
>>
>>
>> However, this will only delete data in you changelog topic but not in
>> your RocksDB -- if you want to get data delete in RocksDB, you would
>> need to send tombstone messages for those keys. It's kinda tricky to get
>> this done.
>>
>> An "brute force" alternative would be, stop the application, delete the
>> local state directory, and restart. This will force Streams to recreate
>> the RocksDB files from the changelog and thus only loading keys that got
>> not deleted. But this is of course a quite expensive approach and you
>> should be very careful about using it.
>>
>>
>> -Matthias
>>
>>
>> On 2/13/17 12:25 AM, Jan Lukavský wrote:
>>> Hi Michael,
>>>
>>> sorry for my late answer. Configuring the topic as you suggest is one
>>> option (and I will configure it that way), but I wanted to combine the
>>> two data retention mechanisms (if possible). I would like to use log
>>> compaction, so that I will always get at least the last message for
>>> given key, but I would also like to use the classical temporal data
>>> retention, which would function as a sort of TTL for the keys - if a key
>>> doesn't get an update for the configured period of time, if could be
>>> removed. That way I could ensure that out-dated keys could be removed.
>>>
>>> Is there any other option for this? And can kafka be configured this
>>> way?
>>>
>>> Best,
>>>
>>>   Jan
>>>
>>> On 02/09/2017 12:08 PM, Michael Noll wrote:
>>>> Jan,
>>>>
>>>>>    - if I don't send any data to a kafka partition for a period longer
>>>>> then
>>>> the data retention interval, then all data from the partition is wiped
>>>> out
>>>>
>>>> If I interpret your first and second message in this email thread
>>>> correctly, then you are talking only about your "state topic" here,
>>>> i.e.
>>>> the topic that you read into a KTable.  You should configure this
>>>> topic to
>>>> use log compaction, which will ensure that the latest value for a
>>>> given key
>>>> will never be wiped.  So even if you don't send any data to a Kafka
>>>> partition of this (now log-compacted) "state topic" for a long
>>>> period of
>>>> time, you'd always have access to (at least) the latest value for
>>>> every key.
>>>>
>>>> Would that help?
>>>>
>>>> -Michael
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> first of all, thanks for your answer. Sorry if I didn't explain the
>>>>> problem well, I didn't want to dig too much into detail to focus on
>>>>> the
>>>>> important and maybe the result was not clear.
>>>>>
>>>>> My fault, I will try to explain in again. I have two KafkaConsumers
>>>>> in two
>>>>> separate threads consuming from two topics - let's call the first one
>>>>> "stream topic" (processed like KStream)
>>>>>
>>>>> and the second one "state topic" (processed like KTable). The state
>>>>> topic
>>>>> carries a persistent data that I need in order to process the stream
>>>>> topic,
>>>>> so I need to cache the state topic
>>>>>
>>>>> locally before starting consumption of the stream topic. When the
>>>>> application is running normally, there seems to be no issue with this,
>>>>>
>>>>> because the state topic is updated asynchronously and I use internal
>>>>> locks
>>>>> to synchronize the processing inside the application. So far,
>>>>> everything is
>>>>> fine.
>>>>>
>>>>>
>>>>> The problem might arise when the application starts - then I do the
>>>>> following:
>>>>>
>>>>>    - lock processing of the stream topic (because I don't have the
>>>>> state
>>>>> topic cached)
>>>>>
>>>>>    - read the current offset N from the state topic (which gives me
>>>>> offsets
>>>>> of a message that should be expected next, that is message that has
>>>>> not yet
>>>>> been written)
>>>>>
>>>>>    - reset offset of the state topic to beginning and read it until I
>>>>> read
>>>>> offset N - 1, which tells me that I have cached all the data I need to
>>>>> process the stream topic, so I unlock the stream processing and
>>>>> continue
>>>>>
>>>>> All this works well, except for some very rare situation, when the
>>>>> following happens (as I understand it, maybe here I am making some
>>>>> mistake):
>>>>>
>>>>>    - for a long period of time there is no update to (at least single
>>>>> partition) of the state topic
>>>>>
>>>>>    - when I try to cache the state topic during startup as explained
>>>>> above,
>>>>> it might never finish, because I will never get a message with offset
>>>>> N - 1
>>>>> - that is because I will not get any message at all, because all of
>>>>> the
>>>>> data has been wiped out
>>>>>
>>>>>    - because I don't know if I get all the data from the state
>>>>> topic, I
>>>>> cannot start processing the stream topic and the whole application is
>>>>> stuck, until first message arrives into all partition of the state
>>>>> topic
>>>>> (which might even never happen)
>>>>>
>>>>>    - I might use some sort of timeout to handle this, but this
>>>>> could be
>>>>> dangerous, relying on KafkaConsumer.poll() returning empty records
>>>>> sounds
>>>>> to me a little fragile too (because this might also indicate that no
>>>>> records could have been fetched within the timeout, am I right?), what
>>>>> would totally solve my issue would be that during data retention, the
>>>>> last
>>>>> message would always be kept, and therefore I will always get the
>>>>> message
>>>>> with offset N - 1, and the whole issue would vanish.
>>>>>
>>>>> The situation when a partition on the state topic gets no updates
>>>>> during
>>>>> long time happens mostly in development environment (where there is
>>>>> little
>>>>> to no traffic), but I sense that this could be an issue in production
>>>>> too,
>>>>> for example due to some repartitioning of topics.
>>>>>
>>>>> Does that make any sense to you now?
>>>>>
>>>>> Thanks again for your response,
>>>>>
>>>>>    Jan
>>>>>
>>>>>
>>>>>
>>>>> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>>>>>
>>>>>> Jan,
>>>>>>
>>>>>> you scenario is quite complex and I am not sure if I understood every
>>>>>> part of it. I try to break it down:
>>>>>>
>>>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>>>>> subset of its partitions),
>>>>>>> wait until all the old data has been cached and then start
>>>>>>> processing of
>>>>>>> a different stream
>>>>>>>
>>>>>> That is hard to accomplish in general. Kafka Streams internally uses
>>>>>> KafkaConsumer (one instance per StreamThread) and thus, does rely on
>>>>>> the
>>>>>> consumer's behavior with regard to poll(). Hence, Streams cannot
>>>>>> control
>>>>>> in detail, what data will be fetched from the brokers.
>>>>>>
>>>>>> Furthermore, Streams follow its own internal strategy to pick a
>>>>>> record
>>>>>> (from the available ones returned from poll()) and you cannot
>>>>>> control in
>>>>>> your code (at least not directly) what record will be picked.
>>>>>>
>>>>>> Basically, Streams tried to process records in "timestamp order", ie,
>>>>>> based an the timestamp returned from TimestampExtractor. So you can
>>>>>> "influence" the processing order by record timestamps (as far as you
>>>>>> can
>>>>>> influence them) and/or by providing a custom TimestampExtractor.
>>>>>>
>>>>>> In your example, you might want the records you want to process first
>>>>>> (KTable), to have smaller timestamps (ie, be earlier) than the
>>>>>> records
>>>>>> from your KStream. But even this will only give you "best effort"
>>>>>> behavior, and it can happen that a KStream record is processed before
>>>>>> all KTable records to processed. It's a know issues but hard to
>>>>>> resolve.
>>>>>>
>>>>>> when the specific partition doesn't get any message within the
>>>>>> retention
>>>>>>> period,
>>>>>>> then I end up stuck trying to prefetch data to the "KTable" -
>>>>>>> this is
>>>>>>> because I get
>>>>>>> the offset of the last message (plus 1) from the broker, but I
>>>>>>> don't get
>>>>>>> any data
>>>>>>> ever (until I send a message to the partition)
>>>>>>>
>>>>>> Cannot follow here: if there is no data, than you can of course not
>>>>>> process any data -- so why do you end up being stuck?
>>>>>>
>>>>>> The problem I see here is that kafka tells me what the last offset
>>>>>> in a
>>>>>>> partition is,
>>>>>>> but there is no upper bound on when a first message will arrive,
>>>>>>>
>>>>>> In general, the latency between data append at the broker and data
>>>>>> receive at a consumer is rather small. So even if there is
>>>>>> strictly no
>>>>>> upper bound until a message gets delivered, this should not be an
>>>>>> issue
>>>>>> in practice. Or do I miss understand something?
>>>>>>
>>>>>> even though I reset the offset and start reading from the beginning
>>>>>> of a
>>>>>>> partition.
>>>>>>>
>>>>>> How does this relate? Cannot follow.
>>>>>>
>>>>>> My question is, is it a possibility not to clear the whole
>>>>>> partition, but
>>>>>>> to always keep at least the last message?
>>>>>>>
>>>>>> Not with regular retention policy -- not sure if log compaction can
>>>>>> help
>>>>>> here.
>>>>>>
>>>>>> That way, the client would always get at least the last message, can
>>>>>>> therefore figure out
>>>>>>> it is at the end of the partition (reading the old data) and start
>>>>>>> processing.
>>>>>>>
>>>>>> Why is this required? If the client's offset is the same as
>>>>>> "endOfLog"
>>>>>> for each partition, you can figure out that there is nothing to
>>>>>> read. So
>>>>>> why would you need the last old message to figure this out?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 2/7/17 3:46 AM, Jan Lukavský wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have a question how to do a correct caching in KTable-like
>>>>>>> structure
>>>>>>> on application startup. I'm not sure if this belongs to user or dev
>>>>>>> maillist, so sorry if I've chosen the bad one. What is my
>>>>>>> observation so
>>>>>>> far:
>>>>>>>
>>>>>>>     - if I don't send any data to a kafka partition for a period
>>>>>>> longer
>>>>>>> then the data retention interval, then all data from the
>>>>>>> partition is
>>>>>>> wiped out
>>>>>>>
>>>>>>>     - the index file is not cleared (which is obvious, it has to
>>>>>>> keep track
>>>>>>> of the next offset to assign to a new message)
>>>>>>>
>>>>>>> In my scenario on startup, I want to read all data from a topic
>>>>>>> (or a
>>>>>>> subset of its partitions), wait until all the old data has been
>>>>>>> cached
>>>>>>> and then start processing of a different stream (basically I'm
>>>>>>> doing a
>>>>>>> join of KStream and KTable, but I have implemented it manually
>>>>>>> due to
>>>>>>> some special behavior). Now, what is the issue here - when the
>>>>>>> specific
>>>>>>> partition doesn't get any message within the retention period,
>>>>>>> then I
>>>>>>> end up stuck trying to prefetch data to the "KTable" - this is
>>>>>>> because I
>>>>>>> get the offset of the last message (plus 1) from the broker, but I
>>>>>>> don't
>>>>>>> get any data ever (until I send a message to the partition). The
>>>>>>> problem
>>>>>>> I see here is that kafka tells me what the last offset in a
>>>>>>> partition
>>>>>>> is, but there is no upper bound on when a first message will arrive,
>>>>>>> even though I reset the offset and start reading from the beginning
>>>>>>> of a
>>>>>>> partition. My question is, is it a possibility not to clear the
>>>>>>> whole
>>>>>>> partition, but to always keep at least the last message? That
>>>>>>> way, the
>>>>>>> client would always get at least the last message, can therefore
>>>>>>> figure
>>>>>>> out it is at the end of the partition (reading the old data) and
>>>>>>> start
>>>>>>> processing. I believe that KTable implementation could have a very
>>>>>>> similar issue. Or is there any other way around? I could add a
>>>>>>> timeout,
>>>>>>> but this seems a little fragile.
>>>>>>>
>>>>>>> Thanks in advance for any suggestions and opinions,
>>>>>>>
>>>>>>>     Jan
>>>>>>>
>>>>>>>
> 


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Matthias,

I understand that the local cache will not be automatically cleared, but 
that is not an issue for me now.

The problem I see is still the same as at the beginning - even caching 
data to RocksDB in KafkaStreams implementation might (I would say) 
suffer from this issue. When using time based data retention (for 
whatever reason, maybe in combination with the log compation, but the 
the issue is there irrespective to whether the log compation is used or 
not), it is possible that some partition will report nonzero "next" 
offset, but will not be able to deliver any message to the KafkaConsumer 
(because the partition is emptied by the data retention) and therefore 
the consumer will not be able to finish the materialization of the topic 
to local store (either RocksDB or any other cache) and therefore will 
not be able to start processing the KStream. If I understand the problem 
right, then using timestamp will not help either, because there must be 
some sort of vector clock with a time dimension for each input 
partition, and the empty partition will not be able to move the 
timestamp any further, and therefore the whole system will remain 
blocked at timestamp 0, because the vector clock usually calculates 
minimum from all time dimensions.

Does that make any sense, or I am doing something fundamentally wrong? :)

Thanks again for any thoughts,

  Jan


On 02/13/2017 06:37 PM, Matthias J. Sax wrote:
> Jan,
>
> brokers with version 0.10.1 or higher allow to set both topic cleanup
> policies in combination:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
>
> However, this will only delete data in you changelog topic but not in
> your RocksDB -- if you want to get data delete in RocksDB, you would
> need to send tombstone messages for those keys. It's kinda tricky to get
> this done.
>
> An "brute force" alternative would be, stop the application, delete the
> local state directory, and restart. This will force Streams to recreate
> the RocksDB files from the changelog and thus only loading keys that got
> not deleted. But this is of course a quite expensive approach and you
> should be very careful about using it.
>
>
> -Matthias
>
>
> On 2/13/17 12:25 AM, Jan Lukavsk� wrote:
>> Hi Michael,
>>
>> sorry for my late answer. Configuring the topic as you suggest is one
>> option (and I will configure it that way), but I wanted to combine the
>> two data retention mechanisms (if possible). I would like to use log
>> compaction, so that I will always get at least the last message for
>> given key, but I would also like to use the classical temporal data
>> retention, which would function as a sort of TTL for the keys - if a key
>> doesn't get an update for the configured period of time, if could be
>> removed. That way I could ensure that out-dated keys could be removed.
>>
>> Is there any other option for this? And can kafka be configured this way?
>>
>> Best,
>>
>>   Jan
>>
>> On 02/09/2017 12:08 PM, Michael Noll wrote:
>>> Jan,
>>>
>>>>    - if I don't send any data to a kafka partition for a period longer
>>>> then
>>> the data retention interval, then all data from the partition is wiped
>>> out
>>>
>>> If I interpret your first and second message in this email thread
>>> correctly, then you are talking only about your "state topic" here, i.e.
>>> the topic that you read into a KTable.  You should configure this
>>> topic to
>>> use log compaction, which will ensure that the latest value for a
>>> given key
>>> will never be wiped.  So even if you don't send any data to a Kafka
>>> partition of this (now log-compacted) "state topic" for a long period of
>>> time, you'd always have access to (at least) the latest value for
>>> every key.
>>>
>>> Would that help?
>>>
>>> -Michael
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavsk� <je...@seznam.cz> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> first of all, thanks for your answer. Sorry if I didn't explain the
>>>> problem well, I didn't want to dig too much into detail to focus on the
>>>> important and maybe the result was not clear.
>>>>
>>>> My fault, I will try to explain in again. I have two KafkaConsumers
>>>> in two
>>>> separate threads consuming from two topics - let's call the first one
>>>> "stream topic" (processed like KStream)
>>>>
>>>> and the second one "state topic" (processed like KTable). The state
>>>> topic
>>>> carries a persistent data that I need in order to process the stream
>>>> topic,
>>>> so I need to cache the state topic
>>>>
>>>> locally before starting consumption of the stream topic. When the
>>>> application is running normally, there seems to be no issue with this,
>>>>
>>>> because the state topic is updated asynchronously and I use internal
>>>> locks
>>>> to synchronize the processing inside the application. So far,
>>>> everything is
>>>> fine.
>>>>
>>>>
>>>> The problem might arise when the application starts - then I do the
>>>> following:
>>>>
>>>>    - lock processing of the stream topic (because I don't have the state
>>>> topic cached)
>>>>
>>>>    - read the current offset N from the state topic (which gives me
>>>> offsets
>>>> of a message that should be expected next, that is message that has
>>>> not yet
>>>> been written)
>>>>
>>>>    - reset offset of the state topic to beginning and read it until I
>>>> read
>>>> offset N - 1, which tells me that I have cached all the data I need to
>>>> process the stream topic, so I unlock the stream processing and continue
>>>>
>>>> All this works well, except for some very rare situation, when the
>>>> following happens (as I understand it, maybe here I am making some
>>>> mistake):
>>>>
>>>>    - for a long period of time there is no update to (at least single
>>>> partition) of the state topic
>>>>
>>>>    - when I try to cache the state topic during startup as explained
>>>> above,
>>>> it might never finish, because I will never get a message with offset
>>>> N - 1
>>>> - that is because I will not get any message at all, because all of the
>>>> data has been wiped out
>>>>
>>>>    - because I don't know if I get all the data from the state topic, I
>>>> cannot start processing the stream topic and the whole application is
>>>> stuck, until first message arrives into all partition of the state topic
>>>> (which might even never happen)
>>>>
>>>>    - I might use some sort of timeout to handle this, but this could be
>>>> dangerous, relying on KafkaConsumer.poll() returning empty records
>>>> sounds
>>>> to me a little fragile too (because this might also indicate that no
>>>> records could have been fetched within the timeout, am I right?), what
>>>> would totally solve my issue would be that during data retention, the
>>>> last
>>>> message would always be kept, and therefore I will always get the
>>>> message
>>>> with offset N - 1, and the whole issue would vanish.
>>>>
>>>> The situation when a partition on the state topic gets no updates during
>>>> long time happens mostly in development environment (where there is
>>>> little
>>>> to no traffic), but I sense that this could be an issue in production
>>>> too,
>>>> for example due to some repartitioning of topics.
>>>>
>>>> Does that make any sense to you now?
>>>>
>>>> Thanks again for your response,
>>>>
>>>>    Jan
>>>>
>>>>
>>>>
>>>> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>>>>
>>>>> Jan,
>>>>>
>>>>> you scenario is quite complex and I am not sure if I understood every
>>>>> part of it. I try to break it down:
>>>>>
>>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>>>> subset of its partitions),
>>>>>> wait until all the old data has been cached and then start
>>>>>> processing of
>>>>>> a different stream
>>>>>>
>>>>> That is hard to accomplish in general. Kafka Streams internally uses
>>>>> KafkaConsumer (one instance per StreamThread) and thus, does rely on
>>>>> the
>>>>> consumer's behavior with regard to poll(). Hence, Streams cannot
>>>>> control
>>>>> in detail, what data will be fetched from the brokers.
>>>>>
>>>>> Furthermore, Streams follow its own internal strategy to pick a record
>>>>> (from the available ones returned from poll()) and you cannot
>>>>> control in
>>>>> your code (at least not directly) what record will be picked.
>>>>>
>>>>> Basically, Streams tried to process records in "timestamp order", ie,
>>>>> based an the timestamp returned from TimestampExtractor. So you can
>>>>> "influence" the processing order by record timestamps (as far as you
>>>>> can
>>>>> influence them) and/or by providing a custom TimestampExtractor.
>>>>>
>>>>> In your example, you might want the records you want to process first
>>>>> (KTable), to have smaller timestamps (ie, be earlier) than the records
>>>>> from your KStream. But even this will only give you "best effort"
>>>>> behavior, and it can happen that a KStream record is processed before
>>>>> all KTable records to processed. It's a know issues but hard to
>>>>> resolve.
>>>>>
>>>>> when the specific partition doesn't get any message within the
>>>>> retention
>>>>>> period,
>>>>>> then I end up stuck trying to prefetch data to the "KTable" - this is
>>>>>> because I get
>>>>>> the offset of the last message (plus 1) from the broker, but I
>>>>>> don't get
>>>>>> any data
>>>>>> ever (until I send a message to the partition)
>>>>>>
>>>>> Cannot follow here: if there is no data, than you can of course not
>>>>> process any data -- so why do you end up being stuck?
>>>>>
>>>>> The problem I see here is that kafka tells me what the last offset in a
>>>>>> partition is,
>>>>>> but there is no upper bound on when a first message will arrive,
>>>>>>
>>>>> In general, the latency between data append at the broker and data
>>>>> receive at a consumer is rather small. So even if there is strictly no
>>>>> upper bound until a message gets delivered, this should not be an issue
>>>>> in practice. Or do I miss understand something?
>>>>>
>>>>> even though I reset the offset and start reading from the beginning
>>>>> of a
>>>>>> partition.
>>>>>>
>>>>> How does this relate? Cannot follow.
>>>>>
>>>>> My question is, is it a possibility not to clear the whole
>>>>> partition, but
>>>>>> to always keep at least the last message?
>>>>>>
>>>>> Not with regular retention policy -- not sure if log compaction can
>>>>> help
>>>>> here.
>>>>>
>>>>> That way, the client would always get at least the last message, can
>>>>>> therefore figure out
>>>>>> it is at the end of the partition (reading the old data) and start
>>>>>> processing.
>>>>>>
>>>>> Why is this required? If the client's offset is the same as "endOfLog"
>>>>> for each partition, you can figure out that there is nothing to
>>>>> read. So
>>>>> why would you need the last old message to figure this out?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 2/7/17 3:46 AM, Jan Lukavsk� wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a question how to do a correct caching in KTable-like structure
>>>>>> on application startup. I'm not sure if this belongs to user or dev
>>>>>> maillist, so sorry if I've chosen the bad one. What is my
>>>>>> observation so
>>>>>> far:
>>>>>>
>>>>>>     - if I don't send any data to a kafka partition for a period longer
>>>>>> then the data retention interval, then all data from the partition is
>>>>>> wiped out
>>>>>>
>>>>>>     - the index file is not cleared (which is obvious, it has to
>>>>>> keep track
>>>>>> of the next offset to assign to a new message)
>>>>>>
>>>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>>>> subset of its partitions), wait until all the old data has been cached
>>>>>> and then start processing of a different stream (basically I'm doing a
>>>>>> join of KStream and KTable, but I have implemented it manually due to
>>>>>> some special behavior). Now, what is the issue here - when the
>>>>>> specific
>>>>>> partition doesn't get any message within the retention period, then I
>>>>>> end up stuck trying to prefetch data to the "KTable" - this is
>>>>>> because I
>>>>>> get the offset of the last message (plus 1) from the broker, but I
>>>>>> don't
>>>>>> get any data ever (until I send a message to the partition). The
>>>>>> problem
>>>>>> I see here is that kafka tells me what the last offset in a partition
>>>>>> is, but there is no upper bound on when a first message will arrive,
>>>>>> even though I reset the offset and start reading from the beginning
>>>>>> of a
>>>>>> partition. My question is, is it a possibility not to clear the whole
>>>>>> partition, but to always keep at least the last message? That way, the
>>>>>> client would always get at least the last message, can therefore
>>>>>> figure
>>>>>> out it is at the end of the partition (reading the old data) and start
>>>>>> processing. I believe that KTable implementation could have a very
>>>>>> similar issue. Or is there any other way around? I could add a
>>>>>> timeout,
>>>>>> but this seems a little fragile.
>>>>>>
>>>>>> Thanks in advance for any suggestions and opinions,
>>>>>>
>>>>>>     Jan
>>>>>>
>>>>>>


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Jan,

brokers with version 0.10.1 or higher allow to set both topic cleanup
policies in combination:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist

However, this will only delete data in you changelog topic but not in
your RocksDB -- if you want to get data delete in RocksDB, you would
need to send tombstone messages for those keys. It's kinda tricky to get
this done.

An "brute force" alternative would be, stop the application, delete the
local state directory, and restart. This will force Streams to recreate
the RocksDB files from the changelog and thus only loading keys that got
not deleted. But this is of course a quite expensive approach and you
should be very careful about using it.


-Matthias


On 2/13/17 12:25 AM, Jan Lukavský wrote:
> Hi Michael,
> 
> sorry for my late answer. Configuring the topic as you suggest is one
> option (and I will configure it that way), but I wanted to combine the
> two data retention mechanisms (if possible). I would like to use log
> compaction, so that I will always get at least the last message for
> given key, but I would also like to use the classical temporal data
> retention, which would function as a sort of TTL for the keys - if a key
> doesn't get an update for the configured period of time, if could be
> removed. That way I could ensure that out-dated keys could be removed.
> 
> Is there any other option for this? And can kafka be configured this way?
> 
> Best,
> 
>  Jan
> 
> On 02/09/2017 12:08 PM, Michael Noll wrote:
>> Jan,
>>
>>>   - if I don't send any data to a kafka partition for a period longer
>>> then
>> the data retention interval, then all data from the partition is wiped
>> out
>>
>> If I interpret your first and second message in this email thread
>> correctly, then you are talking only about your "state topic" here, i.e.
>> the topic that you read into a KTable.  You should configure this
>> topic to
>> use log compaction, which will ensure that the latest value for a
>> given key
>> will never be wiped.  So even if you don't send any data to a Kafka
>> partition of this (now log-compacted) "state topic" for a long period of
>> time, you'd always have access to (at least) the latest value for
>> every key.
>>
>> Would that help?
>>
>> -Michael
>>
>>
>>
>>
>>
>> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Matthias,
>>>
>>> first of all, thanks for your answer. Sorry if I didn't explain the
>>> problem well, I didn't want to dig too much into detail to focus on the
>>> important and maybe the result was not clear.
>>>
>>> My fault, I will try to explain in again. I have two KafkaConsumers
>>> in two
>>> separate threads consuming from two topics - let's call the first one
>>> "stream topic" (processed like KStream)
>>>
>>> and the second one "state topic" (processed like KTable). The state
>>> topic
>>> carries a persistent data that I need in order to process the stream
>>> topic,
>>> so I need to cache the state topic
>>>
>>> locally before starting consumption of the stream topic. When the
>>> application is running normally, there seems to be no issue with this,
>>>
>>> because the state topic is updated asynchronously and I use internal
>>> locks
>>> to synchronize the processing inside the application. So far,
>>> everything is
>>> fine.
>>>
>>>
>>> The problem might arise when the application starts - then I do the
>>> following:
>>>
>>>   - lock processing of the stream topic (because I don't have the state
>>> topic cached)
>>>
>>>   - read the current offset N from the state topic (which gives me
>>> offsets
>>> of a message that should be expected next, that is message that has
>>> not yet
>>> been written)
>>>
>>>   - reset offset of the state topic to beginning and read it until I
>>> read
>>> offset N - 1, which tells me that I have cached all the data I need to
>>> process the stream topic, so I unlock the stream processing and continue
>>>
>>> All this works well, except for some very rare situation, when the
>>> following happens (as I understand it, maybe here I am making some
>>> mistake):
>>>
>>>   - for a long period of time there is no update to (at least single
>>> partition) of the state topic
>>>
>>>   - when I try to cache the state topic during startup as explained
>>> above,
>>> it might never finish, because I will never get a message with offset
>>> N - 1
>>> - that is because I will not get any message at all, because all of the
>>> data has been wiped out
>>>
>>>   - because I don't know if I get all the data from the state topic, I
>>> cannot start processing the stream topic and the whole application is
>>> stuck, until first message arrives into all partition of the state topic
>>> (which might even never happen)
>>>
>>>   - I might use some sort of timeout to handle this, but this could be
>>> dangerous, relying on KafkaConsumer.poll() returning empty records
>>> sounds
>>> to me a little fragile too (because this might also indicate that no
>>> records could have been fetched within the timeout, am I right?), what
>>> would totally solve my issue would be that during data retention, the
>>> last
>>> message would always be kept, and therefore I will always get the
>>> message
>>> with offset N - 1, and the whole issue would vanish.
>>>
>>> The situation when a partition on the state topic gets no updates during
>>> long time happens mostly in development environment (where there is
>>> little
>>> to no traffic), but I sense that this could be an issue in production
>>> too,
>>> for example due to some repartitioning of topics.
>>>
>>> Does that make any sense to you now?
>>>
>>> Thanks again for your response,
>>>
>>>   Jan
>>>
>>>
>>>
>>> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>>>
>>>> Jan,
>>>>
>>>> you scenario is quite complex and I am not sure if I understood every
>>>> part of it. I try to break it down:
>>>>
>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>>> subset of its partitions),
>>>>> wait until all the old data has been cached and then start
>>>>> processing of
>>>>> a different stream
>>>>>
>>>> That is hard to accomplish in general. Kafka Streams internally uses
>>>> KafkaConsumer (one instance per StreamThread) and thus, does rely on
>>>> the
>>>> consumer's behavior with regard to poll(). Hence, Streams cannot
>>>> control
>>>> in detail, what data will be fetched from the brokers.
>>>>
>>>> Furthermore, Streams follow its own internal strategy to pick a record
>>>> (from the available ones returned from poll()) and you cannot
>>>> control in
>>>> your code (at least not directly) what record will be picked.
>>>>
>>>> Basically, Streams tried to process records in "timestamp order", ie,
>>>> based an the timestamp returned from TimestampExtractor. So you can
>>>> "influence" the processing order by record timestamps (as far as you
>>>> can
>>>> influence them) and/or by providing a custom TimestampExtractor.
>>>>
>>>> In your example, you might want the records you want to process first
>>>> (KTable), to have smaller timestamps (ie, be earlier) than the records
>>>> from your KStream. But even this will only give you "best effort"
>>>> behavior, and it can happen that a KStream record is processed before
>>>> all KTable records to processed. It's a know issues but hard to
>>>> resolve.
>>>>
>>>> when the specific partition doesn't get any message within the
>>>> retention
>>>>> period,
>>>>> then I end up stuck trying to prefetch data to the "KTable" - this is
>>>>> because I get
>>>>> the offset of the last message (plus 1) from the broker, but I
>>>>> don't get
>>>>> any data
>>>>> ever (until I send a message to the partition)
>>>>>
>>>> Cannot follow here: if there is no data, than you can of course not
>>>> process any data -- so why do you end up being stuck?
>>>>
>>>> The problem I see here is that kafka tells me what the last offset in a
>>>>> partition is,
>>>>> but there is no upper bound on when a first message will arrive,
>>>>>
>>>> In general, the latency between data append at the broker and data
>>>> receive at a consumer is rather small. So even if there is strictly no
>>>> upper bound until a message gets delivered, this should not be an issue
>>>> in practice. Or do I miss understand something?
>>>>
>>>> even though I reset the offset and start reading from the beginning
>>>> of a
>>>>> partition.
>>>>>
>>>> How does this relate? Cannot follow.
>>>>
>>>> My question is, is it a possibility not to clear the whole
>>>> partition, but
>>>>> to always keep at least the last message?
>>>>>
>>>> Not with regular retention policy -- not sure if log compaction can
>>>> help
>>>> here.
>>>>
>>>> That way, the client would always get at least the last message, can
>>>>> therefore figure out
>>>>> it is at the end of the partition (reading the old data) and start
>>>>> processing.
>>>>>
>>>> Why is this required? If the client's offset is the same as "endOfLog"
>>>> for each partition, you can figure out that there is nothing to
>>>> read. So
>>>> why would you need the last old message to figure this out?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 2/7/17 3:46 AM, Jan Lukavský wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a question how to do a correct caching in KTable-like structure
>>>>> on application startup. I'm not sure if this belongs to user or dev
>>>>> maillist, so sorry if I've chosen the bad one. What is my
>>>>> observation so
>>>>> far:
>>>>>
>>>>>    - if I don't send any data to a kafka partition for a period longer
>>>>> then the data retention interval, then all data from the partition is
>>>>> wiped out
>>>>>
>>>>>    - the index file is not cleared (which is obvious, it has to
>>>>> keep track
>>>>> of the next offset to assign to a new message)
>>>>>
>>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>>> subset of its partitions), wait until all the old data has been cached
>>>>> and then start processing of a different stream (basically I'm doing a
>>>>> join of KStream and KTable, but I have implemented it manually due to
>>>>> some special behavior). Now, what is the issue here - when the
>>>>> specific
>>>>> partition doesn't get any message within the retention period, then I
>>>>> end up stuck trying to prefetch data to the "KTable" - this is
>>>>> because I
>>>>> get the offset of the last message (plus 1) from the broker, but I
>>>>> don't
>>>>> get any data ever (until I send a message to the partition). The
>>>>> problem
>>>>> I see here is that kafka tells me what the last offset in a partition
>>>>> is, but there is no upper bound on when a first message will arrive,
>>>>> even though I reset the offset and start reading from the beginning
>>>>> of a
>>>>> partition. My question is, is it a possibility not to clear the whole
>>>>> partition, but to always keep at least the last message? That way, the
>>>>> client would always get at least the last message, can therefore
>>>>> figure
>>>>> out it is at the end of the partition (reading the old data) and start
>>>>> processing. I believe that KTable implementation could have a very
>>>>> similar issue. Or is there any other way around? I could add a
>>>>> timeout,
>>>>> but this seems a little fragile.
>>>>>
>>>>> Thanks in advance for any suggestions and opinions,
>>>>>
>>>>>    Jan
>>>>>
>>>>>
> 


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Michael,

sorry for my late answer. Configuring the topic as you suggest is one 
option (and I will configure it that way), but I wanted to combine the 
two data retention mechanisms (if possible). I would like to use log 
compaction, so that I will always get at least the last message for 
given key, but I would also like to use the classical temporal data 
retention, which would function as a sort of TTL for the keys - if a key 
doesn't get an update for the configured period of time, if could be 
removed. That way I could ensure that out-dated keys could be removed.

Is there any other option for this? And can kafka be configured this way?

Best,

  Jan

On 02/09/2017 12:08 PM, Michael Noll wrote:
> Jan,
>
>>   - if I don't send any data to a kafka partition for a period longer then
> the data retention interval, then all data from the partition is wiped out
>
> If I interpret your first and second message in this email thread
> correctly, then you are talking only about your "state topic" here, i.e.
> the topic that you read into a KTable.  You should configure this topic to
> use log compaction, which will ensure that the latest value for a given key
> will never be wiped.  So even if you don't send any data to a Kafka
> partition of this (now log-compacted) "state topic" for a long period of
> time, you'd always have access to (at least) the latest value for every key.
>
> Would that help?
>
> -Michael
>
>
>
>
>
> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavsk� <je...@seznam.cz> wrote:
>
>> Hi Matthias,
>>
>> first of all, thanks for your answer. Sorry if I didn't explain the
>> problem well, I didn't want to dig too much into detail to focus on the
>> important and maybe the result was not clear.
>>
>> My fault, I will try to explain in again. I have two KafkaConsumers in two
>> separate threads consuming from two topics - let's call the first one
>> "stream topic" (processed like KStream)
>>
>> and the second one "state topic" (processed like KTable). The state topic
>> carries a persistent data that I need in order to process the stream topic,
>> so I need to cache the state topic
>>
>> locally before starting consumption of the stream topic. When the
>> application is running normally, there seems to be no issue with this,
>>
>> because the state topic is updated asynchronously and I use internal locks
>> to synchronize the processing inside the application. So far, everything is
>> fine.
>>
>>
>> The problem might arise when the application starts - then I do the
>> following:
>>
>>   - lock processing of the stream topic (because I don't have the state
>> topic cached)
>>
>>   - read the current offset N from the state topic (which gives me offsets
>> of a message that should be expected next, that is message that has not yet
>> been written)
>>
>>   - reset offset of the state topic to beginning and read it until I read
>> offset N - 1, which tells me that I have cached all the data I need to
>> process the stream topic, so I unlock the stream processing and continue
>>
>> All this works well, except for some very rare situation, when the
>> following happens (as I understand it, maybe here I am making some mistake):
>>
>>   - for a long period of time there is no update to (at least single
>> partition) of the state topic
>>
>>   - when I try to cache the state topic during startup as explained above,
>> it might never finish, because I will never get a message with offset N - 1
>> - that is because I will not get any message at all, because all of the
>> data has been wiped out
>>
>>   - because I don't know if I get all the data from the state topic, I
>> cannot start processing the stream topic and the whole application is
>> stuck, until first message arrives into all partition of the state topic
>> (which might even never happen)
>>
>>   - I might use some sort of timeout to handle this, but this could be
>> dangerous, relying on KafkaConsumer.poll() returning empty records sounds
>> to me a little fragile too (because this might also indicate that no
>> records could have been fetched within the timeout, am I right?), what
>> would totally solve my issue would be that during data retention, the last
>> message would always be kept, and therefore I will always get the message
>> with offset N - 1, and the whole issue would vanish.
>>
>> The situation when a partition on the state topic gets no updates during
>> long time happens mostly in development environment (where there is little
>> to no traffic), but I sense that this could be an issue in production too,
>> for example due to some repartitioning of topics.
>>
>> Does that make any sense to you now?
>>
>> Thanks again for your response,
>>
>>   Jan
>>
>>
>>
>> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>>
>>> Jan,
>>>
>>> you scenario is quite complex and I am not sure if I understood every
>>> part of it. I try to break it down:
>>>
>>> In my scenario on startup, I want to read all data from a topic (or a
>>>> subset of its partitions),
>>>> wait until all the old data has been cached and then start processing of
>>>> a different stream
>>>>
>>> That is hard to accomplish in general. Kafka Streams internally uses
>>> KafkaConsumer (one instance per StreamThread) and thus, does rely on the
>>> consumer's behavior with regard to poll(). Hence, Streams cannot control
>>> in detail, what data will be fetched from the brokers.
>>>
>>> Furthermore, Streams follow its own internal strategy to pick a record
>>> (from the available ones returned from poll()) and you cannot control in
>>> your code (at least not directly) what record will be picked.
>>>
>>> Basically, Streams tried to process records in "timestamp order", ie,
>>> based an the timestamp returned from TimestampExtractor. So you can
>>> "influence" the processing order by record timestamps (as far as you can
>>> influence them) and/or by providing a custom TimestampExtractor.
>>>
>>> In your example, you might want the records you want to process first
>>> (KTable), to have smaller timestamps (ie, be earlier) than the records
>>> from your KStream. But even this will only give you "best effort"
>>> behavior, and it can happen that a KStream record is processed before
>>> all KTable records to processed. It's a know issues but hard to resolve.
>>>
>>> when the specific partition doesn't get any message within the retention
>>>> period,
>>>> then I end up stuck trying to prefetch data to the "KTable" - this is
>>>> because I get
>>>> the offset of the last message (plus 1) from the broker, but I don't get
>>>> any data
>>>> ever (until I send a message to the partition)
>>>>
>>> Cannot follow here: if there is no data, than you can of course not
>>> process any data -- so why do you end up being stuck?
>>>
>>> The problem I see here is that kafka tells me what the last offset in a
>>>> partition is,
>>>> but there is no upper bound on when a first message will arrive,
>>>>
>>> In general, the latency between data append at the broker and data
>>> receive at a consumer is rather small. So even if there is strictly no
>>> upper bound until a message gets delivered, this should not be an issue
>>> in practice. Or do I miss understand something?
>>>
>>> even though I reset the offset and start reading from the beginning of a
>>>> partition.
>>>>
>>> How does this relate? Cannot follow.
>>>
>>> My question is, is it a possibility not to clear the whole partition, but
>>>> to always keep at least the last message?
>>>>
>>> Not with regular retention policy -- not sure if log compaction can help
>>> here.
>>>
>>> That way, the client would always get at least the last message, can
>>>> therefore figure out
>>>> it is at the end of the partition (reading the old data) and start
>>>> processing.
>>>>
>>> Why is this required? If the client's offset is the same as "endOfLog"
>>> for each partition, you can figure out that there is nothing to read. So
>>> why would you need the last old message to figure this out?
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 2/7/17 3:46 AM, Jan Lukavsk� wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a question how to do a correct caching in KTable-like structure
>>>> on application startup. I'm not sure if this belongs to user or dev
>>>> maillist, so sorry if I've chosen the bad one. What is my observation so
>>>> far:
>>>>
>>>>    - if I don't send any data to a kafka partition for a period longer
>>>> then the data retention interval, then all data from the partition is
>>>> wiped out
>>>>
>>>>    - the index file is not cleared (which is obvious, it has to keep track
>>>> of the next offset to assign to a new message)
>>>>
>>>> In my scenario on startup, I want to read all data from a topic (or a
>>>> subset of its partitions), wait until all the old data has been cached
>>>> and then start processing of a different stream (basically I'm doing a
>>>> join of KStream and KTable, but I have implemented it manually due to
>>>> some special behavior). Now, what is the issue here - when the specific
>>>> partition doesn't get any message within the retention period, then I
>>>> end up stuck trying to prefetch data to the "KTable" - this is because I
>>>> get the offset of the last message (plus 1) from the broker, but I don't
>>>> get any data ever (until I send a message to the partition). The problem
>>>> I see here is that kafka tells me what the last offset in a partition
>>>> is, but there is no upper bound on when a first message will arrive,
>>>> even though I reset the offset and start reading from the beginning of a
>>>> partition. My question is, is it a possibility not to clear the whole
>>>> partition, but to always keep at least the last message? That way, the
>>>> client would always get at least the last message, can therefore figure
>>>> out it is at the end of the partition (reading the old data) and start
>>>> processing. I believe that KTable implementation could have a very
>>>> similar issue. Or is there any other way around? I could add a timeout,
>>>> but this seems a little fragile.
>>>>
>>>> Thanks in advance for any suggestions and opinions,
>>>>
>>>>    Jan
>>>>
>>>>


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by Michael Noll <mi...@confluent.io>.
Jan,

>  - if I don't send any data to a kafka partition for a period longer then
the data retention interval, then all data from the partition is wiped out

If I interpret your first and second message in this email thread
correctly, then you are talking only about your "state topic" here, i.e.
the topic that you read into a KTable.  You should configure this topic to
use log compaction, which will ensure that the latest value for a given key
will never be wiped.  So even if you don't send any data to a Kafka
partition of this (now log-compacted) "state topic" for a long period of
time, you'd always have access to (at least) the latest value for every key.

Would that help?

-Michael





On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Matthias,
>
> first of all, thanks for your answer. Sorry if I didn't explain the
> problem well, I didn't want to dig too much into detail to focus on the
> important and maybe the result was not clear.
>
> My fault, I will try to explain in again. I have two KafkaConsumers in two
> separate threads consuming from two topics - let's call the first one
> "stream topic" (processed like KStream)
>
> and the second one "state topic" (processed like KTable). The state topic
> carries a persistent data that I need in order to process the stream topic,
> so I need to cache the state topic
>
> locally before starting consumption of the stream topic. When the
> application is running normally, there seems to be no issue with this,
>
> because the state topic is updated asynchronously and I use internal locks
> to synchronize the processing inside the application. So far, everything is
> fine.
>
>
> The problem might arise when the application starts - then I do the
> following:
>
>  - lock processing of the stream topic (because I don't have the state
> topic cached)
>
>  - read the current offset N from the state topic (which gives me offsets
> of a message that should be expected next, that is message that has not yet
> been written)
>
>  - reset offset of the state topic to beginning and read it until I read
> offset N - 1, which tells me that I have cached all the data I need to
> process the stream topic, so I unlock the stream processing and continue
>
> All this works well, except for some very rare situation, when the
> following happens (as I understand it, maybe here I am making some mistake):
>
>  - for a long period of time there is no update to (at least single
> partition) of the state topic
>
>  - when I try to cache the state topic during startup as explained above,
> it might never finish, because I will never get a message with offset N - 1
> - that is because I will not get any message at all, because all of the
> data has been wiped out
>
>  - because I don't know if I get all the data from the state topic, I
> cannot start processing the stream topic and the whole application is
> stuck, until first message arrives into all partition of the state topic
> (which might even never happen)
>
>  - I might use some sort of timeout to handle this, but this could be
> dangerous, relying on KafkaConsumer.poll() returning empty records sounds
> to me a little fragile too (because this might also indicate that no
> records could have been fetched within the timeout, am I right?), what
> would totally solve my issue would be that during data retention, the last
> message would always be kept, and therefore I will always get the message
> with offset N - 1, and the whole issue would vanish.
>
> The situation when a partition on the state topic gets no updates during
> long time happens mostly in development environment (where there is little
> to no traffic), but I sense that this could be an issue in production too,
> for example due to some repartitioning of topics.
>
> Does that make any sense to you now?
>
> Thanks again for your response,
>
>  Jan
>
>
>
> On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
>
>> Jan,
>>
>> you scenario is quite complex and I am not sure if I understood every
>> part of it. I try to break it down:
>>
>> In my scenario on startup, I want to read all data from a topic (or a
>>> subset of its partitions),
>>> wait until all the old data has been cached and then start processing of
>>> a different stream
>>>
>> That is hard to accomplish in general. Kafka Streams internally uses
>> KafkaConsumer (one instance per StreamThread) and thus, does rely on the
>> consumer's behavior with regard to poll(). Hence, Streams cannot control
>> in detail, what data will be fetched from the brokers.
>>
>> Furthermore, Streams follow its own internal strategy to pick a record
>> (from the available ones returned from poll()) and you cannot control in
>> your code (at least not directly) what record will be picked.
>>
>> Basically, Streams tried to process records in "timestamp order", ie,
>> based an the timestamp returned from TimestampExtractor. So you can
>> "influence" the processing order by record timestamps (as far as you can
>> influence them) and/or by providing a custom TimestampExtractor.
>>
>> In your example, you might want the records you want to process first
>> (KTable), to have smaller timestamps (ie, be earlier) than the records
>> from your KStream. But even this will only give you "best effort"
>> behavior, and it can happen that a KStream record is processed before
>> all KTable records to processed. It's a know issues but hard to resolve.
>>
>> when the specific partition doesn't get any message within the retention
>>> period,
>>> then I end up stuck trying to prefetch data to the "KTable" - this is
>>> because I get
>>> the offset of the last message (plus 1) from the broker, but I don't get
>>> any data
>>> ever (until I send a message to the partition)
>>>
>> Cannot follow here: if there is no data, than you can of course not
>> process any data -- so why do you end up being stuck?
>>
>> The problem I see here is that kafka tells me what the last offset in a
>>> partition is,
>>> but there is no upper bound on when a first message will arrive,
>>>
>> In general, the latency between data append at the broker and data
>> receive at a consumer is rather small. So even if there is strictly no
>> upper bound until a message gets delivered, this should not be an issue
>> in practice. Or do I miss understand something?
>>
>> even though I reset the offset and start reading from the beginning of a
>>> partition.
>>>
>> How does this relate? Cannot follow.
>>
>> My question is, is it a possibility not to clear the whole partition, but
>>> to always keep at least the last message?
>>>
>> Not with regular retention policy -- not sure if log compaction can help
>> here.
>>
>> That way, the client would always get at least the last message, can
>>> therefore figure out
>>> it is at the end of the partition (reading the old data) and start
>>> processing.
>>>
>> Why is this required? If the client's offset is the same as "endOfLog"
>> for each partition, you can figure out that there is nothing to read. So
>> why would you need the last old message to figure this out?
>>
>>
>> -Matthias
>>
>>
>>
>> On 2/7/17 3:46 AM, Jan Lukavský wrote:
>>
>>> Hi all,
>>>
>>> I have a question how to do a correct caching in KTable-like structure
>>> on application startup. I'm not sure if this belongs to user or dev
>>> maillist, so sorry if I've chosen the bad one. What is my observation so
>>> far:
>>>
>>>   - if I don't send any data to a kafka partition for a period longer
>>> then the data retention interval, then all data from the partition is
>>> wiped out
>>>
>>>   - the index file is not cleared (which is obvious, it has to keep track
>>> of the next offset to assign to a new message)
>>>
>>> In my scenario on startup, I want to read all data from a topic (or a
>>> subset of its partitions), wait until all the old data has been cached
>>> and then start processing of a different stream (basically I'm doing a
>>> join of KStream and KTable, but I have implemented it manually due to
>>> some special behavior). Now, what is the issue here - when the specific
>>> partition doesn't get any message within the retention period, then I
>>> end up stuck trying to prefetch data to the "KTable" - this is because I
>>> get the offset of the last message (plus 1) from the broker, but I don't
>>> get any data ever (until I send a message to the partition). The problem
>>> I see here is that kafka tells me what the last offset in a partition
>>> is, but there is no upper bound on when a first message will arrive,
>>> even though I reset the offset and start reading from the beginning of a
>>> partition. My question is, is it a possibility not to clear the whole
>>> partition, but to always keep at least the last message? That way, the
>>> client would always get at least the last message, can therefore figure
>>> out it is at the end of the partition (reading the old data) and start
>>> processing. I believe that KTable implementation could have a very
>>> similar issue. Or is there any other way around? I could add a timeout,
>>> but this seems a little fragile.
>>>
>>> Thanks in advance for any suggestions and opinions,
>>>
>>>   Jan
>>>
>>>
>

Re: Correct prefetching of data to KTable-like structure on application startup

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Matthias,

first of all, thanks for your answer. Sorry if I didn't explain the 
problem well, I didn't want to dig too much into detail to focus on the 
important and maybe the result was not clear.

My fault, I will try to explain in again. I have two KafkaConsumers in 
two separate threads consuming from two topics - let's call the first 
one "stream topic" (processed like KStream)

and the second one "state topic" (processed like KTable). The state 
topic carries a persistent data that I need in order to process the 
stream topic, so I need to cache the state topic

locally before starting consumption of the stream topic. When the 
application is running normally, there seems to be no issue with this,

because the state topic is updated asynchronously and I use internal 
locks to synchronize the processing inside the application. So far, 
everything is fine.


The problem might arise when the application starts - then I do the 
following:

  - lock processing of the stream topic (because I don't have the state 
topic cached)

  - read the current offset N from the state topic (which gives me 
offsets of a message that should be expected next, that is message that 
has not yet been written)

  - reset offset of the state topic to beginning and read it until I 
read offset N - 1, which tells me that I have cached all the data I need 
to process the stream topic, so I unlock the stream processing and continue

All this works well, except for some very rare situation, when the 
following happens (as I understand it, maybe here I am making some mistake):

  - for a long period of time there is no update to (at least single 
partition) of the state topic

  - when I try to cache the state topic during startup as explained 
above, it might never finish, because I will never get a message with 
offset N - 1 - that is because I will not get any message at all, 
because all of the data has been wiped out

  - because I don't know if I get all the data from the state topic, I 
cannot start processing the stream topic and the whole application is 
stuck, until first message arrives into all partition of the state topic 
(which might even never happen)

  - I might use some sort of timeout to handle this, but this could be 
dangerous, relying on KafkaConsumer.poll() returning empty records 
sounds to me a little fragile too (because this might also indicate that 
no records could have been fetched within the timeout, am I right?), 
what would totally solve my issue would be that during data retention, 
the last message would always be kept, and therefore I will always get 
the message with offset N - 1, and the whole issue would vanish.

The situation when a partition on the state topic gets no updates during 
long time happens mostly in development environment (where there is 
little to no traffic), but I sense that this could be an issue in 
production too, for example due to some repartitioning of topics.

Does that make any sense to you now?

Thanks again for your response,

  Jan


On 02/09/2017 08:00 AM, Matthias J. Sax wrote:
> Jan,
>
> you scenario is quite complex and I am not sure if I understood every
> part of it. I try to break it down:
>
>> In my scenario on startup, I want to read all data from a topic (or a subset of its partitions),
>> wait until all the old data has been cached and then start processing of a different stream
> That is hard to accomplish in general. Kafka Streams internally uses
> KafkaConsumer (one instance per StreamThread) and thus, does rely on the
> consumer's behavior with regard to poll(). Hence, Streams cannot control
> in detail, what data will be fetched from the brokers.
>
> Furthermore, Streams follow its own internal strategy to pick a record
> (from the available ones returned from poll()) and you cannot control in
> your code (at least not directly) what record will be picked.
>
> Basically, Streams tried to process records in "timestamp order", ie,
> based an the timestamp returned from TimestampExtractor. So you can
> "influence" the processing order by record timestamps (as far as you can
> influence them) and/or by providing a custom TimestampExtractor.
>
> In your example, you might want the records you want to process first
> (KTable), to have smaller timestamps (ie, be earlier) than the records
> from your KStream. But even this will only give you "best effort"
> behavior, and it can happen that a KStream record is processed before
> all KTable records to processed. It's a know issues but hard to resolve.
>
>> when the specific partition doesn't get any message within the retention period,
>> then I end up stuck trying to prefetch data to the "KTable" - this is because I get
>> the offset of the last message (plus 1) from the broker, but I don't get any data
>> ever (until I send a message to the partition)
> Cannot follow here: if there is no data, than you can of course not
> process any data -- so why do you end up being stuck?
>
>> The problem I see here is that kafka tells me what the last offset in a partition is,
>> but there is no upper bound on when a first message will arrive,
> In general, the latency between data append at the broker and data
> receive at a consumer is rather small. So even if there is strictly no
> upper bound until a message gets delivered, this should not be an issue
> in practice. Or do I miss understand something?
>
>> even though I reset the offset and start reading from the beginning of a partition.
> How does this relate? Cannot follow.
>
>> My question is, is it a possibility not to clear the whole partition, but to always keep at least the last message?
> Not with regular retention policy -- not sure if log compaction can help
> here.
>
>> That way, the client would always get at least the last message, can therefore figure out
>> it is at the end of the partition (reading the old data) and start processing.
> Why is this required? If the client's offset is the same as "endOfLog"
> for each partition, you can figure out that there is nothing to read. So
> why would you need the last old message to figure this out?
>
>
> -Matthias
>
>
>
> On 2/7/17 3:46 AM, Jan Lukavsk� wrote:
>> Hi all,
>>
>> I have a question how to do a correct caching in KTable-like structure
>> on application startup. I'm not sure if this belongs to user or dev
>> maillist, so sorry if I've chosen the bad one. What is my observation so
>> far:
>>
>>   - if I don't send any data to a kafka partition for a period longer
>> then the data retention interval, then all data from the partition is
>> wiped out
>>
>>   - the index file is not cleared (which is obvious, it has to keep track
>> of the next offset to assign to a new message)
>>
>> In my scenario on startup, I want to read all data from a topic (or a
>> subset of its partitions), wait until all the old data has been cached
>> and then start processing of a different stream (basically I'm doing a
>> join of KStream and KTable, but I have implemented it manually due to
>> some special behavior). Now, what is the issue here - when the specific
>> partition doesn't get any message within the retention period, then I
>> end up stuck trying to prefetch data to the "KTable" - this is because I
>> get the offset of the last message (plus 1) from the broker, but I don't
>> get any data ever (until I send a message to the partition). The problem
>> I see here is that kafka tells me what the last offset in a partition
>> is, but there is no upper bound on when a first message will arrive,
>> even though I reset the offset and start reading from the beginning of a
>> partition. My question is, is it a possibility not to clear the whole
>> partition, but to always keep at least the last message? That way, the
>> client would always get at least the last message, can therefore figure
>> out it is at the end of the partition (reading the old data) and start
>> processing. I believe that KTable implementation could have a very
>> similar issue. Or is there any other way around? I could add a timeout,
>> but this seems a little fragile.
>>
>> Thanks in advance for any suggestions and opinions,
>>
>>   Jan
>>


Re: Correct prefetching of data to KTable-like structure on application startup

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Jan,

you scenario is quite complex and I am not sure if I understood every
part of it. I try to break it down:

> In my scenario on startup, I want to read all data from a topic (or a subset of its partitions),
> wait until all the old data has been cached and then start processing of a different stream

That is hard to accomplish in general. Kafka Streams internally uses
KafkaConsumer (one instance per StreamThread) and thus, does rely on the
consumer's behavior with regard to poll(). Hence, Streams cannot control
in detail, what data will be fetched from the brokers.

Furthermore, Streams follow its own internal strategy to pick a record
(from the available ones returned from poll()) and you cannot control in
your code (at least not directly) what record will be picked.

Basically, Streams tried to process records in "timestamp order", ie,
based an the timestamp returned from TimestampExtractor. So you can
"influence" the processing order by record timestamps (as far as you can
influence them) and/or by providing a custom TimestampExtractor.

In your example, you might want the records you want to process first
(KTable), to have smaller timestamps (ie, be earlier) than the records
from your KStream. But even this will only give you "best effort"
behavior, and it can happen that a KStream record is processed before
all KTable records to processed. It's a know issues but hard to resolve.

> when the specific partition doesn't get any message within the retention period,
> then I end up stuck trying to prefetch data to the "KTable" - this is because I get
> the offset of the last message (plus 1) from the broker, but I don't get any data
> ever (until I send a message to the partition)

Cannot follow here: if there is no data, than you can of course not
process any data -- so why do you end up being stuck?

> The problem I see here is that kafka tells me what the last offset in a partition is,
> but there is no upper bound on when a first message will arrive,

In general, the latency between data append at the broker and data
receive at a consumer is rather small. So even if there is strictly no
upper bound until a message gets delivered, this should not be an issue
in practice. Or do I miss understand something?

> even though I reset the offset and start reading from the beginning of a partition.

How does this relate? Cannot follow.

> My question is, is it a possibility not to clear the whole partition, but to always keep at least the last message?

Not with regular retention policy -- not sure if log compaction can help
here.

> That way, the client would always get at least the last message, can therefore figure out
> it is at the end of the partition (reading the old data) and start processing.

Why is this required? If the client's offset is the same as "endOfLog"
for each partition, you can figure out that there is nothing to read. So
why would you need the last old message to figure this out?


-Matthias



On 2/7/17 3:46 AM, Jan Lukavský wrote:
> Hi all,
> 
> I have a question how to do a correct caching in KTable-like structure
> on application startup. I'm not sure if this belongs to user or dev
> maillist, so sorry if I've chosen the bad one. What is my observation so
> far:
> 
>  - if I don't send any data to a kafka partition for a period longer
> then the data retention interval, then all data from the partition is
> wiped out
> 
>  - the index file is not cleared (which is obvious, it has to keep track
> of the next offset to assign to a new message)
> 
> In my scenario on startup, I want to read all data from a topic (or a
> subset of its partitions), wait until all the old data has been cached
> and then start processing of a different stream (basically I'm doing a
> join of KStream and KTable, but I have implemented it manually due to
> some special behavior). Now, what is the issue here - when the specific
> partition doesn't get any message within the retention period, then I
> end up stuck trying to prefetch data to the "KTable" - this is because I
> get the offset of the last message (plus 1) from the broker, but I don't
> get any data ever (until I send a message to the partition). The problem
> I see here is that kafka tells me what the last offset in a partition
> is, but there is no upper bound on when a first message will arrive,
> even though I reset the offset and start reading from the beginning of a
> partition. My question is, is it a possibility not to clear the whole
> partition, but to always keep at least the last message? That way, the
> client would always get at least the last message, can therefore figure
> out it is at the end of the partition (reading the old data) and start
> processing. I believe that KTable implementation could have a very
> similar issue. Or is there any other way around? I could add a timeout,
> but this seems a little fragile.
> 
> Thanks in advance for any suggestions and opinions,
> 
>  Jan
>