You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jan Filipiak <Ja...@trivago.com> on 2016/09/06 06:52:04 UTC

Re: KIP-33 Opt out from Time Based indexing

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just 
complexity of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all 
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I 
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp 
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce 
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big 
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother 
that much.

Best Jan




On 29.08.2016 19:36, Jun Rao wrote:
> Jan,
>
> For the usefulness of time index, it's ok if you don't plan to use it.
> However, I do think there are other people who will want to use it. Fixing
> an application bug always requires some additional work. Intuitively, being
> able to seek back to a particular point of time for replay is going to be
> much more efficient than always replaying from the very beginning,
> especially when the log is retained for a long period of time. Sure, if you
> want to have more confidence, you want to rewind a bit conservatively. But
> being able to rewind an extra hour makes a big difference from having to
> rewind all to way to 7 days or however long the retention time is.
>
> For the OffsetRequest, I actually agree with you that it's useful. People
> can use that to find the first and the last offset and the offset based on
> a specific point in time. The part that's a bit awkward with OffsetRequest
> is that it's based on the last modified time of the log segment, which
> makes it imprecise (precision is at the segment level, not message level)
> and non-deterministic (last modified time may change). Another awkwardness
> is that it supports returning a list of offsets after a specified
> timestamp. We did that simply because timestamp was only at the segment
> level then. So, our plan is to replace OffsetRequest with a new one. It
> will give you the same functionality: find the first and the last offset
> and the offset based on a specific point in time. It will just be better
> since it's more precise and more deterministic. For your use case, it seems
> that you don't care about message creation time. Then, it's possible for
> you to configure the broker with the log append time. Whether this should
> be default at the Kafka level is debatable, but it won't prevent your use
> case.
>
> For your suggesting on refactoring, I still want to understand how
> necessary it is. Your main concerns so far seem to be.
> (a) Impact on rolling log segments.
> (b) Time-based index is not useful for me.
>
> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
> given my view on this above. Are there any other things that you think that
> having a time-based index will hurt?
>
> Thanks,
>
> Jun
>
> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak <Ja...@trivago.com>
> wrote:
>
>> Hi Jun,
>>
>> thanks for taking the time to answer on such a detailed level. You are
>> right Log.fetchOffsetByTimestamp works, the comment is just confusing
>> "// Get all the segments whose largest timestamp is smaller than target
>> timestamp" wich is apparently is not what takeWhile does (I am more on
>> the Java end of things, so I relied on the comment).
>>
>> Regarding the frequent file rolling i didn't think of Logcompaction but
>> that indeed is a place where **** can hit the fan pretty easy. especially
>> if you don't have many updates in there and you pass the timestamp along in
>> a kafka-streams application. Bootstrapping a new application then indeed
>> could produce quite a few old messages kicking this logrolling of until a
>> recent message appears. I guess that makes it a practical issue again even
>> with the 7 days. Thanks for pointing out! Id like to see the appendTime as
>> default, I am very happy that I have it in the backpocket for purpose of
>> tighter sleep and not to worry to much about someone accidentally doing
>> something dodgy on a weekend with our clusters
>>
>> Regarding the usefulness, you will not be able to sell it for me. I don't
>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
>> see them.
>> Look at the error recovery with timestamp seek:
>> For fixing a bug, a user needs to stop the SP, truncate all his downstream
>> data perfectly based on their time window.Then restart and do the first
>> fetch based
>> again on the perfect window timeout. From then on, he still has NO clue
>> whatsoever if messages that come later now with an earlier timestamp need
>> to go into the
>> previous window or not. (Note that there is  >>>absolutly no<<< way to
>> determine this in aggregated downstream windowed stores). So the user is in
>> .... even though he can seek, he
>> can't rule out his error. IMO it helps them to build the wrong thing, that
>> will just be operational pain *somewhere*
>>
>> Look at the error recovery without timestamp seek:
>> start your application from beginning with a different output
>> (version,key,partition) wait for it to fully catch up. drop the timewindows
>> the error happend + confidence interval (if your data isnt there anymore,
>> no seek will help) in from the old version. Stop the stream processor,
>> merge the data it created, switch back to the original
>> (version,key,partition) and start the SP again.
>> Done. As bigger you choose the confidence interval, the more correct, the
>> less the index helps. usually you want maximum confidence => no index
>> usage, get everything that is still there. (Maybe even redump from hadoop
>> in extreme cases) ironically causing the log to roll all the time (as you
>> probably publish to a new topic and have the streams application use both)
>> :(
>>
>> As you can see, even though the users can seek, if they want to create
>> proper numbers, Billing information eg. They are in trouble, and giving
>> them this index will just make them implement the wrong solution! It boils
>> down to: this index is not the kafka way of doing things. The index can
>> help the second approach but usually one chooses the confidence interval =
>> as much as one can get.
>>
>> Then the last thing. "OffsetRequest is a legacy request. It's awkward to
>> use and we plan to deprecate it over time". You got to be kidding me. It
>> was wired to get the byteposition back then, but getting the offsets is
>> perfectly reasonable and one of the best things in the world. want to know
>> how your stream looked at a specific point in time? get start and end
>> offset, fetch whenever you like, you get an perfect snapshot in wall time.
>> this is usefull for compacted topis aswell as streaming topics. Offsets are
>> a well known thing in kafka and in no way awkward as its monotonically
>> increasing property is just great.
>>
>> For seeking the log based on a confidence interval (the only chance you
>> get in non-key logs reprocessing) one can also bisect the log from the
>> client. As the case is rare it is intensive and causes at least a few
>> hundreds seeks for bigger topics. but I guess the broker does these extra
>> for the new index file now.
>>
>> This index, I feel is just not following the whole "kafka-way". Can you
>> suggest on the proposed re-factoring? what are the chance to get it
>> upstream if I could pull it off? (unlikely)
>>
>> Thanks for all the effort you put in into listening to my concerns. highly
>> appreciated!
>>
>> Best Jan
>>
>>
>>
>>
>> On 25.08.2016 23:36, Jun Rao wrote:
>>
>> Jan,
>>
>> Thanks a lot for the feedback. Now I understood your concern better. The
>> following are my comments.
>>
>> The first odd thing that you pointed out could be a real concern.
>> Basically, if a producer publishes messages with really old timestamp, our
>> default log.roll.hours (7 days) will indeed cause the broker to roll a log
>> on ever message, which would be bad. Time-based rolling is actually used
>> infrequently. The only use case that I am aware of is that for compacted
>> topics, rolling logs based on time could allow the compaction to happen
>> sooner (since the active segment is never cleaned). One option is to change
>> the default log.roll.hours to infinite and also document the impact on
>> changing log.roll.hours. Jiangjie, what do you think?
>>
>> For the second odd thing, the OffsetRequest is a legacy request. It's
>> awkward to use and we plan to deprecate it over time. That's why we haven't
>> change the logic in serving OffsetRequest after KIP-33. The plan is to
>> introduce a new OffsetRequest that will be exploiting the time based index.
>> It's possible to have log segments with non-increasing largest timestamp.
>> As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
>> segments in offset order and stop when we see the target timestamp.
>>
>> For the third odd thing, one of the original reasons why the time-based
>> index points to an offset instead of the file position is that it makes
>> truncating the time index to an offset easier since the offset is in the
>> index. Looking at the code, we could also store the file position in the
>> time index and do truncation based on position, instead of offset. It
>> probably has a slight advantage of consistency between the two indexes and
>> avoiding another level of indirection when looking up the time index.
>> Jiangjie, have we ever considered that?
>>
>> The idea of log.message.timestamp.difference.max.ms is to prevent the
>> timestamp in the published messages to drift too far away from the current
>> timestamp. The default value is infinite though.
>>
>> Lastly, for the usefulness of time-based index, it's actually a feature
>> that the community wanted and voted for, not just for Confluent customers.
>> For example, being able to seek to an offset based on timestamp has been a
>> frequently asked feature. This can be useful for at least the following
>> scenarios: (1) If there is a bug in a consumer application, the user will
>> want to rewind the consumption after fixing the logic. In this case, it's
>> more convenient to rewind the consumption based on a timestamp. (2) In a
>> multi data center setup, it's common for people to mirror the data from one
>> Kafka cluster in one data center to another cluster in a different data
>> center. If one data center fails, people want to be able to resume the
>> consumption in the other data center. Since the offsets are not preserving
>> between the two clusters through mirroring, being able to find a starting
>> offset based on timestamp will allow the consumer to resume the consumption
>> without missing any messages and also not replaying too many messages.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <Ja...@trivago.com>
>> wrote:
>>
>>> Hey Jun,
>>>
>>> I go and try again :), wrote the first one in quite a stressful
>>> environment. The bottom line is that I, for our use cases, see a to small
>>> use/effort ratio in this time index.
>>> We do not bootstrap new consumers for key-less logs so frequently and
>>> when we do it, they usually want everything (prod deployment) or just start
>>> at the end ( during development).
>>> That caused quite some frustration. Would be better if I could just have
>>> turned it off and don't bother any more. Anyhow in the meantime I had to
>>> dig deeper into the inner workings
>>> and the impacts are not as dramatic as I initially assumed. But it still
>>> carries along some oddities I want to list here.
>>>
>>> first odd thing:
>>> Quote
>>> ---
>>> Enforce time based log rolling
>>>
>>> Currently time based log rolling is based on the creating time of the log
>>> segment. With this KIP, the time based rolling would be changed to based on
>>> the largest timestamp ever seen in a log segment. A new log segment will be
>>> rolled out if current time is greater than largest timestamp ever seen in
>>> the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
>>> user should set max.message.time.difference.ms appropriately together
>>> with log.roll.ms to avoid frequent log segment roll out.
>>> ---
>>> imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
>>> some time > log.roll.ms.
>>>  From my understanding, when noone else is producing to this partition
>>> except the mirror maker, the broker will start rolling on every append?
>>> Just because you maybe under DOS-attack and your application only works
>>> in the remote location. (also a good occasion for MM to fall behind)
>>> But checking the default values indicates that it should indeed not
>>> become a problem as log.roll.ms defaults to ~>7 days.
>>>
>>>
>>> second odd thing:
>>> Quote
>>> ---
>>> A time index entry (*T*, *offset*) means that in this segment any
>>> message whose timestamp is greater than *T* come after *offset.*
>>>
>>> The OffsetRequest behaves almost the same as before. If timestamp *T* is
>>> set in the OffsetRequest, the first offset in the returned offset sequence
>>> means that if user want to consume from *T*, that is the offset to start
>>> with. The guarantee is that any message whose timestamp is greater than T
>>> has a bigger offset. i.e. Any message before this offset has a timestamp <
>>> *T*.
>>> ---
>>>
>>> Given how the index is maintained, with a little bit of bad luck (rolling
>>> upgrade/config change of mirrormakers for different colocations) one ends
>>> with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimestamp.
>>> If I do not overlook something here, then the fetch code does not seem to
>>> take that into account.
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
>>> In this case the Goal listed number 1, not loose any messages, is not
>>> achieved. easy fix seems to be to sort the segsArray by maxtimestamp but
>>> can't wrap my head around it just now.
>>>
>>>
>>> third odd thing:
>>> Regarding the worry of increasing complexity. Looking at the code
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
>>> -196
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 &
>>> 230
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
>>> -266
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
>>> -307
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 -
>>> 410
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 -
>>> 435
>>> https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3
>>> f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
>>> -108
>>> and especially
>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
>>> it feels like the Log & Log segment having a detailed knowledge about the
>>> maintained indexes is not the ideal way to model the problem.
>>> Having the Server maintian a Set of Indexes could reduce the code
>>> complexity, while also allowing an easy switch to turn it off. I think both
>>> indexes could point to the physical position, a client would do
>>> fetch(timestamp), and the continue with the offsets as usual. Is there any
>>> specific reason the timestamp index points into the offset index?
>>> For reading one would need to branch earlier, maybe already in ApiHandler
>>> and decide what indexes to query, but this branching logic is there now
>>> anyhow.
>>>
>>> Further I also can't think of a situation where one wants to have this
>>> log.message.timestamp.difference.max.ms take effect. I think this
>>> defeats goal 1 again.
>>>
>>> ITE having this index in the brokers now feels wired to me. Gives me a
>>> feeling of complexity that I don't need and have a hard time figuring out
>>> how much other people can benefit from it. I hope that this feedback is
>>> useful and helps to understand my scepticism regarding this thing. There
>>> were some other oddities that I have a hard time recalling now. So i guess
>>> the index was build for a specific confluent customer, will there be any
>>> blogpost about their usecase? or can you share it?
>>>
>>> Best Jan
>>>
>>>
>>> On 24.08.2016 16:47, Jun Rao wrote:
>>>
>>> Jan,
>>>
>>> Thanks for the reply. I actually wasn't sure what your main concern on
>>> time-based rolling is. Just a couple of clarifications. (1) Time-based
>>> rolling doesn't control how long a segment will be retained for. For
>>> retention, if you use time-based, it will now be based on the timestamp in
>>> the message. If you use size-based, it works the same as before. Is your
>>> concern on time-based retention? If so, you can always configure the
>>> timestamp in all topics to be log append time, which will give you the same
>>> behavior as before. (2) The creation time of the segment is never exposed
>>> to the consumer and therefore is never preserved in MirrorMaker. In
>>> contrast, the timestamp in the message will be preserved in MirrorMaker.
>>> So, not sure what your concern on MirrorMaker is.
>>>
>>> Jun
>>>
>>> On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <Ja...@trivago.com>
>>> wrote:
>>>
>>>> Hi Jun,
>>>>
>>>> I copy pasted this mail from the archive, as I somehow didn't receive it
>>>> per mail. I will sill make some comments in line,
>>>> hopefully you can find them quick enough, my apologies.
>>>>
>>>> To make things more clear, you should also know, that all messages in
>>>> our kafka setup have a common way to access their timestamp already (its
>>>> encoded in the value the same way always)
>>>> Sometimes this is a logical time (eg same timestamp accross many
>>>> different topics / partitions), say PHP request start time or the like. So
>>>> kafkas internal timestamps are not really attractive
>>>> for us anyways currently.
>>>>
>>>> I hope I can make a point and not waste your time.
>>>>
>>>> Best Jan,
>>>>
>>>> hopefully everything makes sense
>>>>
>>>> --------
>>>>
>>>> Jan,
>>>>
>>>> Currently, there is no switch to disable the time based index.
>>>>
>>>> There are quite a few use cases of time based index.
>>>>
>>>> 1. From KIP-33's wiki, it allows us to do time-based retention
>>>> accurately.
>>>> Before KIP-33, the time-based retention is based on the last modified
>>>> time
>>>> of each log segment. The main issue is that last modified time can change
>>>> over time. For example, if a broker loses storage and has to re-replicate
>>>> all data, those re-replicated segments will be retained much longer since
>>>> their last modified time is more recent. Having a time-based index allows
>>>> us to retain segments based on the message time, not the last modified
>>>> time. This can also benefit KIP-71, where we want to combine time-based
>>>> retention and compaction.
>>>>
>>>> /If your sparse on discspace, one could try to get by that with
>>>> retention.bytes/
>>>> or, as we did, ssh into the box and rm it, which worked quite good when
>>>> no one reads it.
>>>> Chuckles a little when its read but readers usually do an
>>>> auto.offset.reset
>>>> (they are to slow any ways if they reading the last segments hrhr).
>>>>
>>>> 2. In KIP-58, we want to delay log compaction based on a configurable
>>>> amount of time. Time-based index allows us to do this more accurately.
>>>>
>>>> /good point, seems reasonable/
>>>>
>>>> 3. We plan to add an api in the consumer to allow seeking to an offset
>>>> based on a timestamp. The time based index allows us to do this more
>>>> accurately and fast.
>>>>
>>>> /Sure, I personally feel that you rarely want to do this. For Camus, we
>>>> used max.pull.historic.days (or simmilliar) successfully quite often. we
>>>> just gave it an extra day and got what we wanted
>>>> and for debugging my bisect tool works well enough. So these are the 2
>>>> usecases we expierenced already and found a decent way around it./
>>>>
>>>> Now for the impact.
>>>>
>>>> a. There is a slight change on how time-based rolling works. Before
>>>> KIP-33,
>>>> rolling was based on the time when a segment was loaded in the broker.
>>>> After KIP-33, rolling is based on the time of the first message of a
>>>> segment. Not sure if this is your concern. In the common case, the two
>>>> behave more or less the same. The latter is actually more deterministic
>>>> since it's not sensitive to broker restarts.
>>>>
>>>> /This is part of my main concern indeed. This is what scares me and I
>>>> preffered to just opt out, instead of reviewing all our pipelines to check
>>>> whats gonna happen when we put it live.
>>>> For Example the Mirrormakers, If they want to preserve create time from
>>>> the source cluster and publish the same create time (wich they should do,
>>>> if you don't encode your own timestamps and want
>>>> to have proper kafka-streams windowing). Then I am quite concerned when
>>>> have problems if our cross ocian links and fall behind, say a day or two.
>>>> Then I can think of an very up to date MirrorMaker from
>>>> one colocation and a very laggy Mirrormaker from another colocation. For
>>>> me its not 100% clear whats gonna happen. But I can't think of sane
>>>> defaults there. That i love kafka for.
>>>> Just tricky to be convinced that an upgrade is safe, wich was usually
>>>> easy.
>>>> /
>>>> b. Time-based index potentially adds overhead to producing messages and
>>>> loading segments. Our experiments show that the impact to producing is
>>>> insignificant. The time to load segments when restarting a broker can be
>>>> doubled. However, the absolute time is still reasonable. For example,
>>>> loading 10K log segments with time-based index takes about 5 seconds.
>>>> /
>>>> //Loading should be fine/, totally agree
>>>>
>>>> c Because time-based index is useful in several cases and the impact
>>>> seems
>>>> small, we didn't consider making time based index optional. Finally,
>>>> although it's possible to make the time based index optional, it will add
>>>> more complexity to the code base. So, we probably should only consider it
>>>> if it's truly needed. Thanks,
>>>>
>>>> /I think one can get away with an easier codebase here. The trick is not
>>>> to have the LOG to implement all the logic,
>>>> but just have the broker maintain a Set of Indexes, that gets
>>>> initialized in starup and passed to the LOG. One could ask each individual
>>>> index, if that logsegment should be rolled, compacted, truncated
>>>> whatever.  Once could also give that LogSegment to each index and make it
>>>> rebuild
>>>> the index for example. I didn't figure out the details. But this
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
>>>> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich
>>>> should already be easier.
>>>> If users don't want time based indexing, just don't put the timebased
>>>> index in the Set then and everything should work like a charm.
>>>> RPC calls that work on the specific indexes would need to throw an
>>>> exception of some kind.
>>>> Just an idea.
>>>> /
>>>> Jun
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 22.08.2016 09 <22.08.2016%2009>:24, Jan Filipiak wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I stumbled across KIP-33 and the time based index, while briefly
>>>>> checking the wiki and commits, I fail to find a way to opt out.
>>>>> I saw it having quite some impact on when logs are rolled and was
>>>>> hoping not to have to deal with all of that. Is there a disable switch I
>>>>> overlooked?
>>>>>
>>>>> Does anybody have a good use case where the timebase index comes in
>>>>> handy? I made a custom console consumer for me,
>>>>> that can bisect a log based on time. Its just a quick probabilistic
>>>>> shot into the log but is sometimes quite useful for some debugging.
>>>>>
>>>>> Best Jan
>>>>>
>>>>
>>>
>>


Re: KIP-33 Opt out from Time Based indexing

Posted by Jan Filipiak <Ja...@trivago.com>.
Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:
> Jan,
>
> For the time rolling issue, Jiangjie has committed a fix (
> https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
> help test out trunk and see if there are any other issues related to
> time-based index?
>
> Thanks,
>
> Jun
>
> On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak <Ja...@trivago.com>
> wrote:
>
>> Hi Jun,
>>
>> sorry for the late reply. Regarding B, my main concern was just complexity
>> of understanding what's going on.
>> As you can see it took me probably some 2 days or so, to fully grab all
>> the details in the implementation and what
>> the impacts are. Usually I prefer to turn things I don't use off, so I
>> don't have to bother. Log Append time will work for me.
>>
>> Rolling logs was my main concern. The producer can specify the timestamp
>> and we use epoch inside the message, I'd bet money,
>> people in the company would have put this epoch also in the produce
>> record. => rollings logs as the broker thinks its millis.
>> So that would probably have caused us at least one outage if a big
>> producer had upgraded and done this, IMO likely mistake.
>>
>> Id just hoped for a more obvious kill-switch, so I didn’t need to bother
>> that much.
>>
>> Best Jan
>>
>>
>>
>>
>>
>> On 29.08.2016 19:36, Jun Rao wrote:
>>
>>> Jan,
>>>
>>> For the usefulness of time index, it's ok if you don't plan to use it.
>>> However, I do think there are other people who will want to use it. Fixing
>>> an application bug always requires some additional work. Intuitively,
>>> being
>>> able to seek back to a particular point of time for replay is going to be
>>> much more efficient than always replaying from the very beginning,
>>> especially when the log is retained for a long period of time. Sure, if
>>> you
>>> want to have more confidence, you want to rewind a bit conservatively. But
>>> being able to rewind an extra hour makes a big difference from having to
>>> rewind all to way to 7 days or however long the retention time is.
>>>
>>> For the OffsetRequest, I actually agree with you that it's useful. People
>>> can use that to find the first and the last offset and the offset based on
>>> a specific point in time. The part that's a bit awkward with OffsetRequest
>>> is that it's based on the last modified time of the log segment, which
>>> makes it imprecise (precision is at the segment level, not message level)
>>> and non-deterministic (last modified time may change). Another awkwardness
>>> is that it supports returning a list of offsets after a specified
>>> timestamp. We did that simply because timestamp was only at the segment
>>> level then. So, our plan is to replace OffsetRequest with a new one. It
>>> will give you the same functionality: find the first and the last offset
>>> and the offset based on a specific point in time. It will just be better
>>> since it's more precise and more deterministic. For your use case, it
>>> seems
>>> that you don't care about message creation time. Then, it's possible for
>>> you to configure the broker with the log append time. Whether this should
>>> be default at the Kafka level is debatable, but it won't prevent your use
>>> case.
>>>
>>> For your suggesting on refactoring, I still want to understand how
>>> necessary it is. Your main concerns so far seem to be.
>>> (a) Impact on rolling log segments.
>>> (b) Time-based index is not useful for me.
>>>
>>> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
>>> have
>>> given my view on this above. Are there any other things that you think
>>> that
>>> having a time-based index will hurt?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak <Ja...@trivago.com>
>>> wrote:
>>>
>>> Hi Jun,
>>>> thanks for taking the time to answer on such a detailed level. You are
>>>> right Log.fetchOffsetByTimestamp works, the comment is just confusing
>>>> "// Get all the segments whose largest timestamp is smaller than target
>>>> timestamp" wich is apparently is not what takeWhile does (I am more on
>>>> the Java end of things, so I relied on the comment).
>>>>
>>>> Regarding the frequent file rolling i didn't think of Logcompaction but
>>>> that indeed is a place where **** can hit the fan pretty easy. especially
>>>> if you don't have many updates in there and you pass the timestamp along
>>>> in
>>>> a kafka-streams application. Bootstrapping a new application then indeed
>>>> could produce quite a few old messages kicking this logrolling of until a
>>>> recent message appears. I guess that makes it a practical issue again
>>>> even
>>>> with the 7 days. Thanks for pointing out! Id like to see the appendTime
>>>> as
>>>> default, I am very happy that I have it in the backpocket for purpose of
>>>> tighter sleep and not to worry to much about someone accidentally doing
>>>> something dodgy on a weekend with our clusters
>>>>
>>>> Regarding the usefulness, you will not be able to sell it for me. I don't
>>>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want
>>>> to
>>>> see them.
>>>> Look at the error recovery with timestamp seek:
>>>> For fixing a bug, a user needs to stop the SP, truncate all his
>>>> downstream
>>>> data perfectly based on their time window.Then restart and do the first
>>>> fetch based
>>>> again on the perfect window timeout. From then on, he still has NO clue
>>>> whatsoever if messages that come later now with an earlier timestamp need
>>>> to go into the
>>>> previous window or not. (Note that there is  >>>absolutly no<<< way to
>>>> determine this in aggregated downstream windowed stores). So the user is
>>>> in
>>>> .... even though he can seek, he
>>>> can't rule out his error. IMO it helps them to build the wrong thing,
>>>> that
>>>> will just be operational pain *somewhere*
>>>>
>>>> Look at the error recovery without timestamp seek:
>>>> start your application from beginning with a different output
>>>> (version,key,partition) wait for it to fully catch up. drop the
>>>> timewindows
>>>> the error happend + confidence interval (if your data isnt there anymore,
>>>> no seek will help) in from the old version. Stop the stream processor,
>>>> merge the data it created, switch back to the original
>>>> (version,key,partition) and start the SP again.
>>>> Done. As bigger you choose the confidence interval, the more correct, the
>>>> less the index helps. usually you want maximum confidence => no index
>>>> usage, get everything that is still there. (Maybe even redump from hadoop
>>>> in extreme cases) ironically causing the log to roll all the time (as you
>>>> probably publish to a new topic and have the streams application use
>>>> both)
>>>> :(
>>>>
>>>> As you can see, even though the users can seek, if they want to create
>>>> proper numbers, Billing information eg. They are in trouble, and giving
>>>> them this index will just make them implement the wrong solution! It
>>>> boils
>>>> down to: this index is not the kafka way of doing things. The index can
>>>> help the second approach but usually one chooses the confidence interval
>>>> =
>>>> as much as one can get.
>>>>
>>>> Then the last thing. "OffsetRequest is a legacy request. It's awkward to
>>>> use and we plan to deprecate it over time". You got to be kidding me. It
>>>> was wired to get the byteposition back then, but getting the offsets is
>>>> perfectly reasonable and one of the best things in the world. want to
>>>> know
>>>> how your stream looked at a specific point in time? get start and end
>>>> offset, fetch whenever you like, you get an perfect snapshot in wall
>>>> time.
>>>> this is usefull for compacted topis aswell as streaming topics. Offsets
>>>> are
>>>> a well known thing in kafka and in no way awkward as its monotonically
>>>> increasing property is just great.
>>>>
>>>> For seeking the log based on a confidence interval (the only chance you
>>>> get in non-key logs reprocessing) one can also bisect the log from the
>>>> client. As the case is rare it is intensive and causes at least a few
>>>> hundreds seeks for bigger topics. but I guess the broker does these extra
>>>> for the new index file now.
>>>>
>>>> This index, I feel is just not following the whole "kafka-way". Can you
>>>> suggest on the proposed re-factoring? what are the chance to get it
>>>> upstream if I could pull it off? (unlikely)
>>>>
>>>> Thanks for all the effort you put in into listening to my concerns.
>>>> highly
>>>> appreciated!
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>>
>>>> On 25.08.2016 23:36, Jun Rao wrote:
>>>>
>>>> Jan,
>>>>
>>>> Thanks a lot for the feedback. Now I understood your concern better. The
>>>> following are my comments.
>>>>
>>>> The first odd thing that you pointed out could be a real concern.
>>>> Basically, if a producer publishes messages with really old timestamp,
>>>> our
>>>> default log.roll.hours (7 days) will indeed cause the broker to roll a
>>>> log
>>>> on ever message, which would be bad. Time-based rolling is actually used
>>>> infrequently. The only use case that I am aware of is that for compacted
>>>> topics, rolling logs based on time could allow the compaction to happen
>>>> sooner (since the active segment is never cleaned). One option is to
>>>> change
>>>> the default log.roll.hours to infinite and also document the impact on
>>>> changing log.roll.hours. Jiangjie, what do you think?
>>>>
>>>> For the second odd thing, the OffsetRequest is a legacy request. It's
>>>> awkward to use and we plan to deprecate it over time. That's why we
>>>> haven't
>>>> change the logic in serving OffsetRequest after KIP-33. The plan is to
>>>> introduce a new OffsetRequest that will be exploiting the time based
>>>> index.
>>>> It's possible to have log segments with non-increasing largest timestamp.
>>>> As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
>>>> segments in offset order and stop when we see the target timestamp.
>>>>
>>>> For the third odd thing, one of the original reasons why the time-based
>>>> index points to an offset instead of the file position is that it makes
>>>> truncating the time index to an offset easier since the offset is in the
>>>> index. Looking at the code, we could also store the file position in the
>>>> time index and do truncation based on position, instead of offset. It
>>>> probably has a slight advantage of consistency between the two indexes
>>>> and
>>>> avoiding another level of indirection when looking up the time index.
>>>> Jiangjie, have we ever considered that?
>>>>
>>>> The idea of log.message.timestamp.difference.max.ms is to prevent the
>>>> timestamp in the published messages to drift too far away from the
>>>> current
>>>> timestamp. The default value is infinite though.
>>>>
>>>> Lastly, for the usefulness of time-based index, it's actually a feature
>>>> that the community wanted and voted for, not just for Confluent
>>>> customers.
>>>> For example, being able to seek to an offset based on timestamp has been
>>>> a
>>>> frequently asked feature. This can be useful for at least the following
>>>> scenarios: (1) If there is a bug in a consumer application, the user will
>>>> want to rewind the consumption after fixing the logic. In this case, it's
>>>> more convenient to rewind the consumption based on a timestamp. (2) In a
>>>> multi data center setup, it's common for people to mirror the data from
>>>> one
>>>> Kafka cluster in one data center to another cluster in a different data
>>>> center. If one data center fails, people want to be able to resume the
>>>> consumption in the other data center. Since the offsets are not
>>>> preserving
>>>> between the two clusters through mirroring, being able to find a starting
>>>> offset based on timestamp will allow the consumer to resume the
>>>> consumption
>>>> without missing any messages and also not replaying too many messages.
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>>
>>>> On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <Ja...@trivago.com>
>>>> wrote:
>>>>
>>>> Hey Jun,
>>>>> I go and try again :), wrote the first one in quite a stressful
>>>>> environment. The bottom line is that I, for our use cases, see a to
>>>>> small
>>>>> use/effort ratio in this time index.
>>>>> We do not bootstrap new consumers for key-less logs so frequently and
>>>>> when we do it, they usually want everything (prod deployment) or just
>>>>> start
>>>>> at the end ( during development).
>>>>> That caused quite some frustration. Would be better if I could just have
>>>>> turned it off and don't bother any more. Anyhow in the meantime I had to
>>>>> dig deeper into the inner workings
>>>>> and the impacts are not as dramatic as I initially assumed. But it still
>>>>> carries along some oddities I want to list here.
>>>>>
>>>>> first odd thing:
>>>>> Quote
>>>>> ---
>>>>> Enforce time based log rolling
>>>>>
>>>>> Currently time based log rolling is based on the creating time of the
>>>>> log
>>>>> segment. With this KIP, the time based rolling would be changed to
>>>>> based on
>>>>> the largest timestamp ever seen in a log segment. A new log segment
>>>>> will be
>>>>> rolled out if current time is greater than largest timestamp ever seen
>>>>> in
>>>>> the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
>>>>> user should set max.message.time.difference.ms appropriately together
>>>>> with log.roll.ms to avoid frequent log segment roll out.
>>>>> ---
>>>>> imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
>>>>> some time > log.roll.ms.
>>>>>   From my understanding, when noone else is producing to this partition
>>>>> except the mirror maker, the broker will start rolling on every append?
>>>>> Just because you maybe under DOS-attack and your application only works
>>>>> in the remote location. (also a good occasion for MM to fall behind)
>>>>> But checking the default values indicates that it should indeed not
>>>>> become a problem as log.roll.ms defaults to ~>7 days.
>>>>>
>>>>>
>>>>> second odd thing:
>>>>> Quote
>>>>> ---
>>>>> A time index entry (*T*, *offset*) means that in this segment any
>>>>> message whose timestamp is greater than *T* come after *offset.*
>>>>>
>>>>> The OffsetRequest behaves almost the same as before. If timestamp *T* is
>>>>> set in the OffsetRequest, the first offset in the returned offset
>>>>> sequence
>>>>> means that if user want to consume from *T*, that is the offset to start
>>>>> with. The guarantee is that any message whose timestamp is greater than
>>>>> T
>>>>> has a bigger offset. i.e. Any message before this offset has a
>>>>> timestamp <
>>>>> *T*.
>>>>> ---
>>>>>
>>>>> Given how the index is maintained, with a little bit of bad luck
>>>>> (rolling
>>>>> upgrade/config change of mirrormakers for different colocations) one
>>>>> ends
>>>>> with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimest
>>>>> amp.
>>>>> If I do not overlook something here, then the fetch code does not seem
>>>>> to
>>>>> take that into account.
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
>>>>> In this case the Goal listed number 1, not loose any messages, is not
>>>>> achieved. easy fix seems to be to sort the segsArray by maxtimestamp but
>>>>> can't wrap my head around it just now.
>>>>>
>>>>>
>>>>> third odd thing:
>>>>> Regarding the worry of increasing complexity. Looking at the code
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
>>>>> -196
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 &
>>>>> 230
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
>>>>> -266
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
>>>>> -307
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 -
>>>>> 410
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 -
>>>>> 435
>>>>> https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3
>>>>> f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
>>>>> -108
>>>>> and especially
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
>>>>> it feels like the Log & Log segment having a detailed knowledge about
>>>>> the
>>>>> maintained indexes is not the ideal way to model the problem.
>>>>> Having the Server maintian a Set of Indexes could reduce the code
>>>>> complexity, while also allowing an easy switch to turn it off. I think
>>>>> both
>>>>> indexes could point to the physical position, a client would do
>>>>> fetch(timestamp), and the continue with the offsets as usual. Is there
>>>>> any
>>>>> specific reason the timestamp index points into the offset index?
>>>>> For reading one would need to branch earlier, maybe already in
>>>>> ApiHandler
>>>>> and decide what indexes to query, but this branching logic is there now
>>>>> anyhow.
>>>>>
>>>>> Further I also can't think of a situation where one wants to have this
>>>>> log.message.timestamp.difference.max.ms take effect. I think this
>>>>> defeats goal 1 again.
>>>>>
>>>>> ITE having this index in the brokers now feels wired to me. Gives me a
>>>>> feeling of complexity that I don't need and have a hard time figuring
>>>>> out
>>>>> how much other people can benefit from it. I hope that this feedback is
>>>>> useful and helps to understand my scepticism regarding this thing. There
>>>>> were some other oddities that I have a hard time recalling now. So i
>>>>> guess
>>>>> the index was build for a specific confluent customer, will there be any
>>>>> blogpost about their usecase? or can you share it?
>>>>>
>>>>> Best Jan
>>>>>
>>>>>
>>>>> On 24.08.2016 16:47, Jun Rao wrote:
>>>>>
>>>>> Jan,
>>>>>
>>>>> Thanks for the reply. I actually wasn't sure what your main concern on
>>>>> time-based rolling is. Just a couple of clarifications. (1) Time-based
>>>>> rolling doesn't control how long a segment will be retained for. For
>>>>> retention, if you use time-based, it will now be based on the timestamp
>>>>> in
>>>>> the message. If you use size-based, it works the same as before. Is your
>>>>> concern on time-based retention? If so, you can always configure the
>>>>> timestamp in all topics to be log append time, which will give you the
>>>>> same
>>>>> behavior as before. (2) The creation time of the segment is never
>>>>> exposed
>>>>> to the consumer and therefore is never preserved in MirrorMaker. In
>>>>> contrast, the timestamp in the message will be preserved in MirrorMaker.
>>>>> So, not sure what your concern on MirrorMaker is.
>>>>>
>>>>> Jun
>>>>>
>>>>> On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <Jan.Filipiak@trivago.com
>>>>> wrote:
>>>>>
>>>>> Hi Jun,
>>>>>> I copy pasted this mail from the archive, as I somehow didn't receive
>>>>>> it
>>>>>> per mail. I will sill make some comments in line,
>>>>>> hopefully you can find them quick enough, my apologies.
>>>>>>
>>>>>> To make things more clear, you should also know, that all messages in
>>>>>> our kafka setup have a common way to access their timestamp already
>>>>>> (its
>>>>>> encoded in the value the same way always)
>>>>>> Sometimes this is a logical time (eg same timestamp accross many
>>>>>> different topics / partitions), say PHP request start time or the
>>>>>> like. So
>>>>>> kafkas internal timestamps are not really attractive
>>>>>> for us anyways currently.
>>>>>>
>>>>>> I hope I can make a point and not waste your time.
>>>>>>
>>>>>> Best Jan,
>>>>>>
>>>>>> hopefully everything makes sense
>>>>>>
>>>>>> --------
>>>>>>
>>>>>> Jan,
>>>>>>
>>>>>> Currently, there is no switch to disable the time based index.
>>>>>>
>>>>>> There are quite a few use cases of time based index.
>>>>>>
>>>>>> 1. From KIP-33's wiki, it allows us to do time-based retention
>>>>>> accurately.
>>>>>> Before KIP-33, the time-based retention is based on the last modified
>>>>>> time
>>>>>> of each log segment. The main issue is that last modified time can
>>>>>> change
>>>>>> over time. For example, if a broker loses storage and has to
>>>>>> re-replicate
>>>>>> all data, those re-replicated segments will be retained much longer
>>>>>> since
>>>>>> their last modified time is more recent. Having a time-based index
>>>>>> allows
>>>>>> us to retain segments based on the message time, not the last modified
>>>>>> time. This can also benefit KIP-71, where we want to combine time-based
>>>>>> retention and compaction.
>>>>>>
>>>>>> /If your sparse on discspace, one could try to get by that with
>>>>>> retention.bytes/
>>>>>> or, as we did, ssh into the box and rm it, which worked quite good when
>>>>>> no one reads it.
>>>>>> Chuckles a little when its read but readers usually do an
>>>>>> auto.offset.reset
>>>>>> (they are to slow any ways if they reading the last segments hrhr).
>>>>>>
>>>>>> 2. In KIP-58, we want to delay log compaction based on a configurable
>>>>>> amount of time. Time-based index allows us to do this more accurately.
>>>>>>
>>>>>> /good point, seems reasonable/
>>>>>>
>>>>>> 3. We plan to add an api in the consumer to allow seeking to an offset
>>>>>> based on a timestamp. The time based index allows us to do this more
>>>>>> accurately and fast.
>>>>>>
>>>>>> /Sure, I personally feel that you rarely want to do this. For Camus, we
>>>>>> used max.pull.historic.days (or simmilliar) successfully quite often.
>>>>>> we
>>>>>> just gave it an extra day and got what we wanted
>>>>>> and for debugging my bisect tool works well enough. So these are the 2
>>>>>> usecases we expierenced already and found a decent way around it./
>>>>>>
>>>>>> Now for the impact.
>>>>>>
>>>>>> a. There is a slight change on how time-based rolling works. Before
>>>>>> KIP-33,
>>>>>> rolling was based on the time when a segment was loaded in the broker.
>>>>>> After KIP-33, rolling is based on the time of the first message of a
>>>>>> segment. Not sure if this is your concern. In the common case, the two
>>>>>> behave more or less the same. The latter is actually more deterministic
>>>>>> since it's not sensitive to broker restarts.
>>>>>>
>>>>>> /This is part of my main concern indeed. This is what scares me and I
>>>>>> preffered to just opt out, instead of reviewing all our pipelines to
>>>>>> check
>>>>>> whats gonna happen when we put it live.
>>>>>> For Example the Mirrormakers, If they want to preserve create time from
>>>>>> the source cluster and publish the same create time (wich they should
>>>>>> do,
>>>>>> if you don't encode your own timestamps and want
>>>>>> to have proper kafka-streams windowing). Then I am quite concerned when
>>>>>> have problems if our cross ocian links and fall behind, say a day or
>>>>>> two.
>>>>>> Then I can think of an very up to date MirrorMaker from
>>>>>> one colocation and a very laggy Mirrormaker from another colocation.
>>>>>> For
>>>>>> me its not 100% clear whats gonna happen. But I can't think of sane
>>>>>> defaults there. That i love kafka for.
>>>>>> Just tricky to be convinced that an upgrade is safe, wich was usually
>>>>>> easy.
>>>>>> /
>>>>>> b. Time-based index potentially adds overhead to producing messages and
>>>>>> loading segments. Our experiments show that the impact to producing is
>>>>>> insignificant. The time to load segments when restarting a broker can
>>>>>> be
>>>>>> doubled. However, the absolute time is still reasonable. For example,
>>>>>> loading 10K log segments with time-based index takes about 5 seconds.
>>>>>> /
>>>>>> //Loading should be fine/, totally agree
>>>>>>
>>>>>> c Because time-based index is useful in several cases and the impact
>>>>>> seems
>>>>>> small, we didn't consider making time based index optional. Finally,
>>>>>> although it's possible to make the time based index optional, it will
>>>>>> add
>>>>>> more complexity to the code base. So, we probably should only consider
>>>>>> it
>>>>>> if it's truly needed. Thanks,
>>>>>>
>>>>>> /I think one can get away with an easier codebase here. The trick is
>>>>>> not
>>>>>> to have the LOG to implement all the logic,
>>>>>> but just have the broker maintain a Set of Indexes, that gets
>>>>>> initialized in starup and passed to the LOG. One could ask each
>>>>>> individual
>>>>>> index, if that logsegment should be rolled, compacted, truncated
>>>>>> whatever.  Once could also give that LogSegment to each index and make
>>>>>> it
>>>>>> rebuild
>>>>>> the index for example. I didn't figure out the details. But this
>>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
>>>>>> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich
>>>>>> should already be easier.
>>>>>> If users don't want time based indexing, just don't put the timebased
>>>>>> index in the Set then and everything should work like a charm.
>>>>>> RPC calls that work on the specific indexes would need to throw an
>>>>>> exception of some kind.
>>>>>> Just an idea.
>>>>>> /
>>>>>> Jun
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 22.08.2016 09 <22.08.2016%2009>:24, Jan Filipiak wrote:
>>>>>>
>>>>>> Hello everyone,
>>>>>>> I stumbled across KIP-33 and the time based index, while briefly
>>>>>>> checking the wiki and commits, I fail to find a way to opt out.
>>>>>>> I saw it having quite some impact on when logs are rolled and was
>>>>>>> hoping not to have to deal with all of that. Is there a disable
>>>>>>> switch I
>>>>>>> overlooked?
>>>>>>>
>>>>>>> Does anybody have a good use case where the timebase index comes in
>>>>>>> handy? I made a custom console consumer for me,
>>>>>>> that can bisect a log based on time. Its just a quick probabilistic
>>>>>>> shot into the log but is sometimes quite useful for some debugging.
>>>>>>>
>>>>>>> Best Jan
>>>>>>>
>>>>>>>


Re: KIP-33 Opt out from Time Based indexing

Posted by Jun Rao <ju...@confluent.io>.
Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak <Ja...@trivago.com>
wrote:

> Hi Jun,
>
> sorry for the late reply. Regarding B, my main concern was just complexity
> of understanding what's going on.
> As you can see it took me probably some 2 days or so, to fully grab all
> the details in the implementation and what
> the impacts are. Usually I prefer to turn things I don't use off, so I
> don't have to bother. Log Append time will work for me.
>
> Rolling logs was my main concern. The producer can specify the timestamp
> and we use epoch inside the message, I'd bet money,
> people in the company would have put this epoch also in the produce
> record. => rollings logs as the broker thinks its millis.
> So that would probably have caused us at least one outage if a big
> producer had upgraded and done this, IMO likely mistake.
>
> Id just hoped for a more obvious kill-switch, so I didn’t need to bother
> that much.
>
> Best Jan
>
>
>
>
>
> On 29.08.2016 19:36, Jun Rao wrote:
>
>> Jan,
>>
>> For the usefulness of time index, it's ok if you don't plan to use it.
>> However, I do think there are other people who will want to use it. Fixing
>> an application bug always requires some additional work. Intuitively,
>> being
>> able to seek back to a particular point of time for replay is going to be
>> much more efficient than always replaying from the very beginning,
>> especially when the log is retained for a long period of time. Sure, if
>> you
>> want to have more confidence, you want to rewind a bit conservatively. But
>> being able to rewind an extra hour makes a big difference from having to
>> rewind all to way to 7 days or however long the retention time is.
>>
>> For the OffsetRequest, I actually agree with you that it's useful. People
>> can use that to find the first and the last offset and the offset based on
>> a specific point in time. The part that's a bit awkward with OffsetRequest
>> is that it's based on the last modified time of the log segment, which
>> makes it imprecise (precision is at the segment level, not message level)
>> and non-deterministic (last modified time may change). Another awkwardness
>> is that it supports returning a list of offsets after a specified
>> timestamp. We did that simply because timestamp was only at the segment
>> level then. So, our plan is to replace OffsetRequest with a new one. It
>> will give you the same functionality: find the first and the last offset
>> and the offset based on a specific point in time. It will just be better
>> since it's more precise and more deterministic. For your use case, it
>> seems
>> that you don't care about message creation time. Then, it's possible for
>> you to configure the broker with the log append time. Whether this should
>> be default at the Kafka level is debatable, but it won't prevent your use
>> case.
>>
>> For your suggesting on refactoring, I still want to understand how
>> necessary it is. Your main concerns so far seem to be.
>> (a) Impact on rolling log segments.
>> (b) Time-based index is not useful for me.
>>
>> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
>> have
>> given my view on this above. Are there any other things that you think
>> that
>> having a time-based index will hurt?
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak <Ja...@trivago.com>
>> wrote:
>>
>> Hi Jun,
>>>
>>> thanks for taking the time to answer on such a detailed level. You are
>>> right Log.fetchOffsetByTimestamp works, the comment is just confusing
>>> "// Get all the segments whose largest timestamp is smaller than target
>>> timestamp" wich is apparently is not what takeWhile does (I am more on
>>> the Java end of things, so I relied on the comment).
>>>
>>> Regarding the frequent file rolling i didn't think of Logcompaction but
>>> that indeed is a place where **** can hit the fan pretty easy. especially
>>> if you don't have many updates in there and you pass the timestamp along
>>> in
>>> a kafka-streams application. Bootstrapping a new application then indeed
>>> could produce quite a few old messages kicking this logrolling of until a
>>> recent message appears. I guess that makes it a practical issue again
>>> even
>>> with the 7 days. Thanks for pointing out! Id like to see the appendTime
>>> as
>>> default, I am very happy that I have it in the backpocket for purpose of
>>> tighter sleep and not to worry to much about someone accidentally doing
>>> something dodgy on a weekend with our clusters
>>>
>>> Regarding the usefulness, you will not be able to sell it for me. I don't
>>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want
>>> to
>>> see them.
>>> Look at the error recovery with timestamp seek:
>>> For fixing a bug, a user needs to stop the SP, truncate all his
>>> downstream
>>> data perfectly based on their time window.Then restart and do the first
>>> fetch based
>>> again on the perfect window timeout. From then on, he still has NO clue
>>> whatsoever if messages that come later now with an earlier timestamp need
>>> to go into the
>>> previous window or not. (Note that there is  >>>absolutly no<<< way to
>>> determine this in aggregated downstream windowed stores). So the user is
>>> in
>>> .... even though he can seek, he
>>> can't rule out his error. IMO it helps them to build the wrong thing,
>>> that
>>> will just be operational pain *somewhere*
>>>
>>> Look at the error recovery without timestamp seek:
>>> start your application from beginning with a different output
>>> (version,key,partition) wait for it to fully catch up. drop the
>>> timewindows
>>> the error happend + confidence interval (if your data isnt there anymore,
>>> no seek will help) in from the old version. Stop the stream processor,
>>> merge the data it created, switch back to the original
>>> (version,key,partition) and start the SP again.
>>> Done. As bigger you choose the confidence interval, the more correct, the
>>> less the index helps. usually you want maximum confidence => no index
>>> usage, get everything that is still there. (Maybe even redump from hadoop
>>> in extreme cases) ironically causing the log to roll all the time (as you
>>> probably publish to a new topic and have the streams application use
>>> both)
>>> :(
>>>
>>> As you can see, even though the users can seek, if they want to create
>>> proper numbers, Billing information eg. They are in trouble, and giving
>>> them this index will just make them implement the wrong solution! It
>>> boils
>>> down to: this index is not the kafka way of doing things. The index can
>>> help the second approach but usually one chooses the confidence interval
>>> =
>>> as much as one can get.
>>>
>>> Then the last thing. "OffsetRequest is a legacy request. It's awkward to
>>> use and we plan to deprecate it over time". You got to be kidding me. It
>>> was wired to get the byteposition back then, but getting the offsets is
>>> perfectly reasonable and one of the best things in the world. want to
>>> know
>>> how your stream looked at a specific point in time? get start and end
>>> offset, fetch whenever you like, you get an perfect snapshot in wall
>>> time.
>>> this is usefull for compacted topis aswell as streaming topics. Offsets
>>> are
>>> a well known thing in kafka and in no way awkward as its monotonically
>>> increasing property is just great.
>>>
>>> For seeking the log based on a confidence interval (the only chance you
>>> get in non-key logs reprocessing) one can also bisect the log from the
>>> client. As the case is rare it is intensive and causes at least a few
>>> hundreds seeks for bigger topics. but I guess the broker does these extra
>>> for the new index file now.
>>>
>>> This index, I feel is just not following the whole "kafka-way". Can you
>>> suggest on the proposed re-factoring? what are the chance to get it
>>> upstream if I could pull it off? (unlikely)
>>>
>>> Thanks for all the effort you put in into listening to my concerns.
>>> highly
>>> appreciated!
>>>
>>> Best Jan
>>>
>>>
>>>
>>>
>>> On 25.08.2016 23:36, Jun Rao wrote:
>>>
>>> Jan,
>>>
>>> Thanks a lot for the feedback. Now I understood your concern better. The
>>> following are my comments.
>>>
>>> The first odd thing that you pointed out could be a real concern.
>>> Basically, if a producer publishes messages with really old timestamp,
>>> our
>>> default log.roll.hours (7 days) will indeed cause the broker to roll a
>>> log
>>> on ever message, which would be bad. Time-based rolling is actually used
>>> infrequently. The only use case that I am aware of is that for compacted
>>> topics, rolling logs based on time could allow the compaction to happen
>>> sooner (since the active segment is never cleaned). One option is to
>>> change
>>> the default log.roll.hours to infinite and also document the impact on
>>> changing log.roll.hours. Jiangjie, what do you think?
>>>
>>> For the second odd thing, the OffsetRequest is a legacy request. It's
>>> awkward to use and we plan to deprecate it over time. That's why we
>>> haven't
>>> change the logic in serving OffsetRequest after KIP-33. The plan is to
>>> introduce a new OffsetRequest that will be exploiting the time based
>>> index.
>>> It's possible to have log segments with non-increasing largest timestamp.
>>> As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
>>> segments in offset order and stop when we see the target timestamp.
>>>
>>> For the third odd thing, one of the original reasons why the time-based
>>> index points to an offset instead of the file position is that it makes
>>> truncating the time index to an offset easier since the offset is in the
>>> index. Looking at the code, we could also store the file position in the
>>> time index and do truncation based on position, instead of offset. It
>>> probably has a slight advantage of consistency between the two indexes
>>> and
>>> avoiding another level of indirection when looking up the time index.
>>> Jiangjie, have we ever considered that?
>>>
>>> The idea of log.message.timestamp.difference.max.ms is to prevent the
>>> timestamp in the published messages to drift too far away from the
>>> current
>>> timestamp. The default value is infinite though.
>>>
>>> Lastly, for the usefulness of time-based index, it's actually a feature
>>> that the community wanted and voted for, not just for Confluent
>>> customers.
>>> For example, being able to seek to an offset based on timestamp has been
>>> a
>>> frequently asked feature. This can be useful for at least the following
>>> scenarios: (1) If there is a bug in a consumer application, the user will
>>> want to rewind the consumption after fixing the logic. In this case, it's
>>> more convenient to rewind the consumption based on a timestamp. (2) In a
>>> multi data center setup, it's common for people to mirror the data from
>>> one
>>> Kafka cluster in one data center to another cluster in a different data
>>> center. If one data center fails, people want to be able to resume the
>>> consumption in the other data center. Since the offsets are not
>>> preserving
>>> between the two clusters through mirroring, being able to find a starting
>>> offset based on timestamp will allow the consumer to resume the
>>> consumption
>>> without missing any messages and also not replaying too many messages.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <Ja...@trivago.com>
>>> wrote:
>>>
>>> Hey Jun,
>>>>
>>>> I go and try again :), wrote the first one in quite a stressful
>>>> environment. The bottom line is that I, for our use cases, see a to
>>>> small
>>>> use/effort ratio in this time index.
>>>> We do not bootstrap new consumers for key-less logs so frequently and
>>>> when we do it, they usually want everything (prod deployment) or just
>>>> start
>>>> at the end ( during development).
>>>> That caused quite some frustration. Would be better if I could just have
>>>> turned it off and don't bother any more. Anyhow in the meantime I had to
>>>> dig deeper into the inner workings
>>>> and the impacts are not as dramatic as I initially assumed. But it still
>>>> carries along some oddities I want to list here.
>>>>
>>>> first odd thing:
>>>> Quote
>>>> ---
>>>> Enforce time based log rolling
>>>>
>>>> Currently time based log rolling is based on the creating time of the
>>>> log
>>>> segment. With this KIP, the time based rolling would be changed to
>>>> based on
>>>> the largest timestamp ever seen in a log segment. A new log segment
>>>> will be
>>>> rolled out if current time is greater than largest timestamp ever seen
>>>> in
>>>> the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
>>>> user should set max.message.time.difference.ms appropriately together
>>>> with log.roll.ms to avoid frequent log segment roll out.
>>>> ---
>>>> imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
>>>> some time > log.roll.ms.
>>>>  From my understanding, when noone else is producing to this partition
>>>> except the mirror maker, the broker will start rolling on every append?
>>>> Just because you maybe under DOS-attack and your application only works
>>>> in the remote location. (also a good occasion for MM to fall behind)
>>>> But checking the default values indicates that it should indeed not
>>>> become a problem as log.roll.ms defaults to ~>7 days.
>>>>
>>>>
>>>> second odd thing:
>>>> Quote
>>>> ---
>>>> A time index entry (*T*, *offset*) means that in this segment any
>>>> message whose timestamp is greater than *T* come after *offset.*
>>>>
>>>> The OffsetRequest behaves almost the same as before. If timestamp *T* is
>>>> set in the OffsetRequest, the first offset in the returned offset
>>>> sequence
>>>> means that if user want to consume from *T*, that is the offset to start
>>>> with. The guarantee is that any message whose timestamp is greater than
>>>> T
>>>> has a bigger offset. i.e. Any message before this offset has a
>>>> timestamp <
>>>> *T*.
>>>> ---
>>>>
>>>> Given how the index is maintained, with a little bit of bad luck
>>>> (rolling
>>>> upgrade/config change of mirrormakers for different colocations) one
>>>> ends
>>>> with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimest
>>>> amp.
>>>> If I do not overlook something here, then the fetch code does not seem
>>>> to
>>>> take that into account.
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
>>>> In this case the Goal listed number 1, not loose any messages, is not
>>>> achieved. easy fix seems to be to sort the segsArray by maxtimestamp but
>>>> can't wrap my head around it just now.
>>>>
>>>>
>>>> third odd thing:
>>>> Regarding the worry of increasing complexity. Looking at the code
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
>>>> -196
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 &
>>>> 230
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
>>>> -266
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
>>>> -307
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 -
>>>> 410
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 -
>>>> 435
>>>> https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3
>>>> f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
>>>> -108
>>>> and especially
>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
>>>> it feels like the Log & Log segment having a detailed knowledge about
>>>> the
>>>> maintained indexes is not the ideal way to model the problem.
>>>> Having the Server maintian a Set of Indexes could reduce the code
>>>> complexity, while also allowing an easy switch to turn it off. I think
>>>> both
>>>> indexes could point to the physical position, a client would do
>>>> fetch(timestamp), and the continue with the offsets as usual. Is there
>>>> any
>>>> specific reason the timestamp index points into the offset index?
>>>> For reading one would need to branch earlier, maybe already in
>>>> ApiHandler
>>>> and decide what indexes to query, but this branching logic is there now
>>>> anyhow.
>>>>
>>>> Further I also can't think of a situation where one wants to have this
>>>> log.message.timestamp.difference.max.ms take effect. I think this
>>>> defeats goal 1 again.
>>>>
>>>> ITE having this index in the brokers now feels wired to me. Gives me a
>>>> feeling of complexity that I don't need and have a hard time figuring
>>>> out
>>>> how much other people can benefit from it. I hope that this feedback is
>>>> useful and helps to understand my scepticism regarding this thing. There
>>>> were some other oddities that I have a hard time recalling now. So i
>>>> guess
>>>> the index was build for a specific confluent customer, will there be any
>>>> blogpost about their usecase? or can you share it?
>>>>
>>>> Best Jan
>>>>
>>>>
>>>> On 24.08.2016 16:47, Jun Rao wrote:
>>>>
>>>> Jan,
>>>>
>>>> Thanks for the reply. I actually wasn't sure what your main concern on
>>>> time-based rolling is. Just a couple of clarifications. (1) Time-based
>>>> rolling doesn't control how long a segment will be retained for. For
>>>> retention, if you use time-based, it will now be based on the timestamp
>>>> in
>>>> the message. If you use size-based, it works the same as before. Is your
>>>> concern on time-based retention? If so, you can always configure the
>>>> timestamp in all topics to be log append time, which will give you the
>>>> same
>>>> behavior as before. (2) The creation time of the segment is never
>>>> exposed
>>>> to the consumer and therefore is never preserved in MirrorMaker. In
>>>> contrast, the timestamp in the message will be preserved in MirrorMaker.
>>>> So, not sure what your concern on MirrorMaker is.
>>>>
>>>> Jun
>>>>
>>>> On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <Jan.Filipiak@trivago.com
>>>> >
>>>> wrote:
>>>>
>>>> Hi Jun,
>>>>>
>>>>> I copy pasted this mail from the archive, as I somehow didn't receive
>>>>> it
>>>>> per mail. I will sill make some comments in line,
>>>>> hopefully you can find them quick enough, my apologies.
>>>>>
>>>>> To make things more clear, you should also know, that all messages in
>>>>> our kafka setup have a common way to access their timestamp already
>>>>> (its
>>>>> encoded in the value the same way always)
>>>>> Sometimes this is a logical time (eg same timestamp accross many
>>>>> different topics / partitions), say PHP request start time or the
>>>>> like. So
>>>>> kafkas internal timestamps are not really attractive
>>>>> for us anyways currently.
>>>>>
>>>>> I hope I can make a point and not waste your time.
>>>>>
>>>>> Best Jan,
>>>>>
>>>>> hopefully everything makes sense
>>>>>
>>>>> --------
>>>>>
>>>>> Jan,
>>>>>
>>>>> Currently, there is no switch to disable the time based index.
>>>>>
>>>>> There are quite a few use cases of time based index.
>>>>>
>>>>> 1. From KIP-33's wiki, it allows us to do time-based retention
>>>>> accurately.
>>>>> Before KIP-33, the time-based retention is based on the last modified
>>>>> time
>>>>> of each log segment. The main issue is that last modified time can
>>>>> change
>>>>> over time. For example, if a broker loses storage and has to
>>>>> re-replicate
>>>>> all data, those re-replicated segments will be retained much longer
>>>>> since
>>>>> their last modified time is more recent. Having a time-based index
>>>>> allows
>>>>> us to retain segments based on the message time, not the last modified
>>>>> time. This can also benefit KIP-71, where we want to combine time-based
>>>>> retention and compaction.
>>>>>
>>>>> /If your sparse on discspace, one could try to get by that with
>>>>> retention.bytes/
>>>>> or, as we did, ssh into the box and rm it, which worked quite good when
>>>>> no one reads it.
>>>>> Chuckles a little when its read but readers usually do an
>>>>> auto.offset.reset
>>>>> (they are to slow any ways if they reading the last segments hrhr).
>>>>>
>>>>> 2. In KIP-58, we want to delay log compaction based on a configurable
>>>>> amount of time. Time-based index allows us to do this more accurately.
>>>>>
>>>>> /good point, seems reasonable/
>>>>>
>>>>> 3. We plan to add an api in the consumer to allow seeking to an offset
>>>>> based on a timestamp. The time based index allows us to do this more
>>>>> accurately and fast.
>>>>>
>>>>> /Sure, I personally feel that you rarely want to do this. For Camus, we
>>>>> used max.pull.historic.days (or simmilliar) successfully quite often.
>>>>> we
>>>>> just gave it an extra day and got what we wanted
>>>>> and for debugging my bisect tool works well enough. So these are the 2
>>>>> usecases we expierenced already and found a decent way around it./
>>>>>
>>>>> Now for the impact.
>>>>>
>>>>> a. There is a slight change on how time-based rolling works. Before
>>>>> KIP-33,
>>>>> rolling was based on the time when a segment was loaded in the broker.
>>>>> After KIP-33, rolling is based on the time of the first message of a
>>>>> segment. Not sure if this is your concern. In the common case, the two
>>>>> behave more or less the same. The latter is actually more deterministic
>>>>> since it's not sensitive to broker restarts.
>>>>>
>>>>> /This is part of my main concern indeed. This is what scares me and I
>>>>> preffered to just opt out, instead of reviewing all our pipelines to
>>>>> check
>>>>> whats gonna happen when we put it live.
>>>>> For Example the Mirrormakers, If they want to preserve create time from
>>>>> the source cluster and publish the same create time (wich they should
>>>>> do,
>>>>> if you don't encode your own timestamps and want
>>>>> to have proper kafka-streams windowing). Then I am quite concerned when
>>>>> have problems if our cross ocian links and fall behind, say a day or
>>>>> two.
>>>>> Then I can think of an very up to date MirrorMaker from
>>>>> one colocation and a very laggy Mirrormaker from another colocation.
>>>>> For
>>>>> me its not 100% clear whats gonna happen. But I can't think of sane
>>>>> defaults there. That i love kafka for.
>>>>> Just tricky to be convinced that an upgrade is safe, wich was usually
>>>>> easy.
>>>>> /
>>>>> b. Time-based index potentially adds overhead to producing messages and
>>>>> loading segments. Our experiments show that the impact to producing is
>>>>> insignificant. The time to load segments when restarting a broker can
>>>>> be
>>>>> doubled. However, the absolute time is still reasonable. For example,
>>>>> loading 10K log segments with time-based index takes about 5 seconds.
>>>>> /
>>>>> //Loading should be fine/, totally agree
>>>>>
>>>>> c Because time-based index is useful in several cases and the impact
>>>>> seems
>>>>> small, we didn't consider making time based index optional. Finally,
>>>>> although it's possible to make the time based index optional, it will
>>>>> add
>>>>> more complexity to the code base. So, we probably should only consider
>>>>> it
>>>>> if it's truly needed. Thanks,
>>>>>
>>>>> /I think one can get away with an easier codebase here. The trick is
>>>>> not
>>>>> to have the LOG to implement all the logic,
>>>>> but just have the broker maintain a Set of Indexes, that gets
>>>>> initialized in starup and passed to the LOG. One could ask each
>>>>> individual
>>>>> index, if that logsegment should be rolled, compacted, truncated
>>>>> whatever.  Once could also give that LogSegment to each index and make
>>>>> it
>>>>> rebuild
>>>>> the index for example. I didn't figure out the details. But this
>>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
>>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
>>>>> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich
>>>>> should already be easier.
>>>>> If users don't want time based indexing, just don't put the timebased
>>>>> index in the Set then and everything should work like a charm.
>>>>> RPC calls that work on the specific indexes would need to throw an
>>>>> exception of some kind.
>>>>> Just an idea.
>>>>> /
>>>>> Jun
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 22.08.2016 09 <22.08.2016%2009>:24, Jan Filipiak wrote:
>>>>>
>>>>> Hello everyone,
>>>>>>
>>>>>> I stumbled across KIP-33 and the time based index, while briefly
>>>>>> checking the wiki and commits, I fail to find a way to opt out.
>>>>>> I saw it having quite some impact on when logs are rolled and was
>>>>>> hoping not to have to deal with all of that. Is there a disable
>>>>>> switch I
>>>>>> overlooked?
>>>>>>
>>>>>> Does anybody have a good use case where the timebase index comes in
>>>>>> handy? I made a custom console consumer for me,
>>>>>> that can bisect a log based on time. Its just a quick probabilistic
>>>>>> shot into the log but is sometimes quite useful for some debugging.
>>>>>>
>>>>>> Best Jan
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>