You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ray Ruvinskiy <ra...@arcticwolf.com> on 2017/11/21 21:39:48 UTC

Time-Based Index for Consuming Messages Up to Certain Timestamp

I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.

I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?

Thanks,

Ray

Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
Thanks for the detailed explanation!

On 2017-11-25, 1:07 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:

    Yes and no :)
    
    1) if you use the index to find the "end-offset" of your scan ("consume
    all messages with a smaller offset") you would compare the offset of
    each message with this "end offset" -- thus, this is the same thing as
    consuming the topic from beginning and just compare the record timestamp
    directly (if it's larger you can stop the scan). What I am trying to say
    is, that the index doesn't help you to build a more efficient solution :)
    
    The index in only useful, if you want to use the offset to *start
    reading* from.
    
    
    2) If you stop reading at the "end-offset", you do not encounter for
    late arriving records. Both the time-index as well as the manual linear
    scan would stop at the first record with a timestamp larger than your
    "end timestamp". However, if you have late arriving data, you would miss
    those. And as you claim you want to sort records in timestamp order, you
    obviously have late data (otherwise, there would not be any need to
    reorder records). Thus, you need to keep scanning for "some more data"
    to also find late records -- how much more you want to scan is something
    you need to define by yourself. For example, you could say, I read until
    I see a message with timestamp "end timestamp + 5minutes", because I
    know data is max 5 minutes late.
    
    
    3) Your original problem was to sort data by timestamp -- thus, if you
    scan your data, you also need to buffer data in main memory, reorder
    out-of-order records, and write back to the output topic. Thus, you
    still need a lot of custom code (but I agree, it might be less than if
    you use Kafka Streams).
    
    
    
    -Matthias
    
    On 11/24/17 2:26 PM, Ray Ruvinskiy wrote:
    > I see, thanks. I suppose I was trying to avoid so much custom logic, which is why I initially was looking at the time-based index functionality. Based on what you’ve said so far, I take it using the time-based index to find an offset corresponding to a timestamp and then consuming all messages with a smaller offset is not a viable solution?
    > 
    > Ray
    > 
    > On 2017-11-22, 12:12 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:
    > 
    >     Using Kafka Streams, it seems reasonable to implement this using
    >     low-level Processor API with a custom state store.
    >     
    >     Thus, you use the `StateStore` interface to implement you state store --
    >     this allows you to spill to disk if you need to to handle state larger
    >     than main memory.
    >     
    >     If you want to browse some state store examples, you can check out
    >     RocksDBStore class that implement Kafka Streams' default `StateStore`.
    >     
    >     Within your custom `Processor` you can access the state accordingly to
    >     maintain the window etc.
    >     
    >     It's a quite special use case and thus, there is not much out-of-the-box
    >     support. You can check out some basic examples here:
    >     https://github.com/confluentinc/kafka-streams-examples
    >     
    >     One example implements a custom state store (but only in-memory):
    >     https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
    >     
    >     Hope this helps.
    >     
    >     
    >     -Matthias
    >     
    >     On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
    >     > Thanks for your reply! I am quite inexperienced when it comes to Kafka and Kafka Streams and so would appreciate a little more guidance. How would one keep messages within a sliding window sorted by timestamp? Would the sort operation be done all in memory? I would be dealing potentially with hundreds of thousands of messages per partition within every 5 minute interval and so was looking for solutions that were not necessary limited by the amount of RAM.
    >     > 
    >     > Ray
    >     > 
    >     > On 2017-11-21, 5:57 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
    >     > 
    >     >     This is possible, but I think you don't need the time-based index for it :)
    >     >     
    >     >     You will just buffer up all messages for a 5 minute sliding-window and
    >     >     maintain all message sorted by timestamp in this window. Each time the
    >     >     window "moves" you write the oldest records that "drop out" of the
    >     >     window to the topic. If you get a record with an older timestamp that
    >     >     allowed, you don't insert in into the window but drop it.
    >     >     
    >     >     The timestamp index is useful if you want to seek to a specific offset
    >     >     base on timestamp. But I don't think you need this for your use case.
    >     >     
    >     >     
    >     >     
    >     >     -Matthias
    >     >     
    >     >     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
    >     >     > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
    >     >     > 
    >     >     > I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
    >     >     > 
    >     >     > Thanks,
    >     >     > 
    >     >     > Ray
    >     >     > 
    >     >     
    >     >     
    >     > 
    >     
    >     
    > 
    
    


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Yes and no :)

1) if you use the index to find the "end-offset" of your scan ("consume
all messages with a smaller offset") you would compare the offset of
each message with this "end offset" -- thus, this is the same thing as
consuming the topic from beginning and just compare the record timestamp
directly (if it's larger you can stop the scan). What I am trying to say
is, that the index doesn't help you to build a more efficient solution :)

The index in only useful, if you want to use the offset to *start
reading* from.


2) If you stop reading at the "end-offset", you do not encounter for
late arriving records. Both the time-index as well as the manual linear
scan would stop at the first record with a timestamp larger than your
"end timestamp". However, if you have late arriving data, you would miss
those. And as you claim you want to sort records in timestamp order, you
obviously have late data (otherwise, there would not be any need to
reorder records). Thus, you need to keep scanning for "some more data"
to also find late records -- how much more you want to scan is something
you need to define by yourself. For example, you could say, I read until
I see a message with timestamp "end timestamp + 5minutes", because I
know data is max 5 minutes late.


3) Your original problem was to sort data by timestamp -- thus, if you
scan your data, you also need to buffer data in main memory, reorder
out-of-order records, and write back to the output topic. Thus, you
still need a lot of custom code (but I agree, it might be less than if
you use Kafka Streams).



-Matthias

On 11/24/17 2:26 PM, Ray Ruvinskiy wrote:
> I see, thanks. I suppose I was trying to avoid so much custom logic, which is why I initially was looking at the time-based index functionality. Based on what you’ve said so far, I take it using the time-based index to find an offset corresponding to a timestamp and then consuming all messages with a smaller offset is not a viable solution?
> 
> Ray
> 
> On 2017-11-22, 12:12 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:
> 
>     Using Kafka Streams, it seems reasonable to implement this using
>     low-level Processor API with a custom state store.
>     
>     Thus, you use the `StateStore` interface to implement you state store --
>     this allows you to spill to disk if you need to to handle state larger
>     than main memory.
>     
>     If you want to browse some state store examples, you can check out
>     RocksDBStore class that implement Kafka Streams' default `StateStore`.
>     
>     Within your custom `Processor` you can access the state accordingly to
>     maintain the window etc.
>     
>     It's a quite special use case and thus, there is not much out-of-the-box
>     support. You can check out some basic examples here:
>     https://github.com/confluentinc/kafka-streams-examples
>     
>     One example implements a custom state store (but only in-memory):
>     https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
>     
>     Hope this helps.
>     
>     
>     -Matthias
>     
>     On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
>     > Thanks for your reply! I am quite inexperienced when it comes to Kafka and Kafka Streams and so would appreciate a little more guidance. How would one keep messages within a sliding window sorted by timestamp? Would the sort operation be done all in memory? I would be dealing potentially with hundreds of thousands of messages per partition within every 5 minute interval and so was looking for solutions that were not necessary limited by the amount of RAM.
>     > 
>     > Ray
>     > 
>     > On 2017-11-21, 5:57 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
>     > 
>     >     This is possible, but I think you don't need the time-based index for it :)
>     >     
>     >     You will just buffer up all messages for a 5 minute sliding-window and
>     >     maintain all message sorted by timestamp in this window. Each time the
>     >     window "moves" you write the oldest records that "drop out" of the
>     >     window to the topic. If you get a record with an older timestamp that
>     >     allowed, you don't insert in into the window but drop it.
>     >     
>     >     The timestamp index is useful if you want to seek to a specific offset
>     >     base on timestamp. But I don't think you need this for your use case.
>     >     
>     >     
>     >     
>     >     -Matthias
>     >     
>     >     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
>     >     > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
>     >     > 
>     >     > I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
>     >     > 
>     >     > Thanks,
>     >     > 
>     >     > Ray
>     >     > 
>     >     
>     >     
>     > 
>     
>     
> 


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
I see, thanks. I suppose I was trying to avoid so much custom logic, which is why I initially was looking at the time-based index functionality. Based on what you’ve said so far, I take it using the time-based index to find an offset corresponding to a timestamp and then consuming all messages with a smaller offset is not a viable solution?

Ray

On 2017-11-22, 12:12 AM, "Matthias J. Sax" <ma...@confluent.io> wrote:

    Using Kafka Streams, it seems reasonable to implement this using
    low-level Processor API with a custom state store.
    
    Thus, you use the `StateStore` interface to implement you state store --
    this allows you to spill to disk if you need to to handle state larger
    than main memory.
    
    If you want to browse some state store examples, you can check out
    RocksDBStore class that implement Kafka Streams' default `StateStore`.
    
    Within your custom `Processor` you can access the state accordingly to
    maintain the window etc.
    
    It's a quite special use case and thus, there is not much out-of-the-box
    support. You can check out some basic examples here:
    https://github.com/confluentinc/kafka-streams-examples
    
    One example implements a custom state store (but only in-memory):
    https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
    
    Hope this helps.
    
    
    -Matthias
    
    On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
    > Thanks for your reply! I am quite inexperienced when it comes to Kafka and Kafka Streams and so would appreciate a little more guidance. How would one keep messages within a sliding window sorted by timestamp? Would the sort operation be done all in memory? I would be dealing potentially with hundreds of thousands of messages per partition within every 5 minute interval and so was looking for solutions that were not necessary limited by the amount of RAM.
    > 
    > Ray
    > 
    > On 2017-11-21, 5:57 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
    > 
    >     This is possible, but I think you don't need the time-based index for it :)
    >     
    >     You will just buffer up all messages for a 5 minute sliding-window and
    >     maintain all message sorted by timestamp in this window. Each time the
    >     window "moves" you write the oldest records that "drop out" of the
    >     window to the topic. If you get a record with an older timestamp that
    >     allowed, you don't insert in into the window but drop it.
    >     
    >     The timestamp index is useful if you want to seek to a specific offset
    >     base on timestamp. But I don't think you need this for your use case.
    >     
    >     
    >     
    >     -Matthias
    >     
    >     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
    >     > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
    >     > 
    >     > I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
    >     > 
    >     > Thanks,
    >     > 
    >     > Ray
    >     > 
    >     
    >     
    > 
    
    


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Using Kafka Streams, it seems reasonable to implement this using
low-level Processor API with a custom state store.

Thus, you use the `StateStore` interface to implement you state store --
this allows you to spill to disk if you need to to handle state larger
than main memory.

If you want to browse some state store examples, you can check out
RocksDBStore class that implement Kafka Streams' default `StateStore`.

Within your custom `Processor` you can access the state accordingly to
maintain the window etc.

It's a quite special use case and thus, there is not much out-of-the-box
support. You can check out some basic examples here:
https://github.com/confluentinc/kafka-streams-examples

One example implements a custom state store (but only in-memory):
https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala

Hope this helps.


-Matthias

On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
> Thanks for your reply! I am quite inexperienced when it comes to Kafka and Kafka Streams and so would appreciate a little more guidance. How would one keep messages within a sliding window sorted by timestamp? Would the sort operation be done all in memory? I would be dealing potentially with hundreds of thousands of messages per partition within every 5 minute interval and so was looking for solutions that were not necessary limited by the amount of RAM.
> 
> Ray
> 
> On 2017-11-21, 5:57 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:
> 
>     This is possible, but I think you don't need the time-based index for it :)
>     
>     You will just buffer up all messages for a 5 minute sliding-window and
>     maintain all message sorted by timestamp in this window. Each time the
>     window "moves" you write the oldest records that "drop out" of the
>     window to the topic. If you get a record with an older timestamp that
>     allowed, you don't insert in into the window but drop it.
>     
>     The timestamp index is useful if you want to seek to a specific offset
>     base on timestamp. But I don't think you need this for your use case.
>     
>     
>     
>     -Matthias
>     
>     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
>     > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
>     > 
>     > I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
>     > 
>     > Thanks,
>     > 
>     > Ray
>     > 
>     
>     
> 


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
Thanks for your reply! I am quite inexperienced when it comes to Kafka and Kafka Streams and so would appreciate a little more guidance. How would one keep messages within a sliding window sorted by timestamp? Would the sort operation be done all in memory? I would be dealing potentially with hundreds of thousands of messages per partition within every 5 minute interval and so was looking for solutions that were not necessary limited by the amount of RAM.

Ray

On 2017-11-21, 5:57 PM, "Matthias J. Sax" <ma...@confluent.io> wrote:

    This is possible, but I think you don't need the time-based index for it :)
    
    You will just buffer up all messages for a 5 minute sliding-window and
    maintain all message sorted by timestamp in this window. Each time the
    window "moves" you write the oldest records that "drop out" of the
    window to the topic. If you get a record with an older timestamp that
    allowed, you don't insert in into the window but drop it.
    
    The timestamp index is useful if you want to seek to a specific offset
    base on timestamp. But I don't think you need this for your use case.
    
    
    
    -Matthias
    
    On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
    > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
    > 
    > I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
    > 
    > Thanks,
    > 
    > Ray
    > 
    
    


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by Ted Yu <yu...@gmail.com>.
bq. an older timestamp that allowed

I guess you meant 'than allowed'

Cheers

On Tue, Nov 21, 2017 at 2:57 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> This is possible, but I think you don't need the time-based index for it :)
>
> You will just buffer up all messages for a 5 minute sliding-window and
> maintain all message sorted by timestamp in this window. Each time the
> window "moves" you write the oldest records that "drop out" of the
> window to the topic. If you get a record with an older timestamp that
> allowed, you don't insert in into the window but drop it.
>
> The timestamp index is useful if you want to seek to a specific offset
> base on timestamp. But I don't think you need this for your use case.
>
>
>
> -Matthias
>
> On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
> > I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 33+-+Add+a+time+based+log+index and trying to determine whether I can use
> the time-based index as an efficient way to sort a stream of messages into
> timestamp (CreateTime) order.
> >
> > I am dealing with a number of sources emitting messages that are then
> processed in a distributed fashion and written to a Kafka topic. During
> this processing, the original order of the messages is not strictly
> maintained. Each message has an embedded timestamp. I’d like to be able to
> sort these messages back into timestamp order, allowing for a certain
> lateness interval, before processing them further. For example, supposing
> the lateness interval is 5 minutes, at time T I’d like to consume from the
> topic all messages with timestamp up to (T - 5 minutes), in timestamp
> order. The assumption is that a message should be no more than 5 minutes
> late; if it is more than 5 minutes late, it can be discarded. Is this
> something that can be done with the time-based index?
> >
> > Thanks,
> >
> > Ray
> >
>
>

Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

Posted by "Matthias J. Sax" <ma...@confluent.io>.
This is possible, but I think you don't need the time-based index for it :)

You will just buffer up all messages for a 5 minute sliding-window and
maintain all message sorted by timestamp in this window. Each time the
window "moves" you write the oldest records that "drop out" of the
window to the topic. If you get a record with an older timestamp that
allowed, you don't insert in into the window but drop it.

The timestamp index is useful if you want to seek to a specific offset
base on timestamp. But I don't think you need this for your use case.



-Matthias

On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
> I’ve been reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index and trying to determine whether I can use the time-based index as an efficient way to sort a stream of messages into timestamp (CreateTime) order.
> 
> I am dealing with a number of sources emitting messages that are then processed in a distributed fashion and written to a Kafka topic. During this processing, the original order of the messages is not strictly maintained. Each message has an embedded timestamp. I’d like to be able to sort these messages back into timestamp order, allowing for a certain lateness interval, before processing them further. For example, supposing the lateness interval is 5 minutes, at time T I’d like to consume from the topic all messages with timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a message should be no more than 5 minutes late; if it is more than 5 minutes late, it can be discarded. Is this something that can be done with the time-based index?
> 
> Thanks,
> 
> Ray
>