You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Colt McNealy <co...@littlehorse.io> on 2023/09/11 04:37:49 UTC

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

Howdy folks,

First I wanted to say fantastic work and thank you to Nick. I built your
branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1. And
it worked! Including the global store (we don't have any segmented stores,
unfortunately).

The test I ran involved running 3,000 workflows with 100 tasks each, and
roughly 650MB state total.

With Streams 3.5.0, I indeed verified that unclean shutdown caused a fresh
restore from scratch. I also benchmarked my application at:
- Running the benchmark took 211 seconds
- 1,421 tasks per second on one partition
- 8 seconds to restore the state (650MB or so)

With KIP 892, I verified that unclean shutdown does not cause a fresh
restore (!!!!). I got the following benchmark results:
- Benchmark took 216 seconds
- 1,401 tasks per second on one partition
- 11 seconds to restore the state

I ran the restorations many times to ensure that there was no rounding
error or noise; the results were remarkably consistent. Additionally, I ran
the restorations with KIP-892 built with Speedb OSS. The restoration time
consistently came out as 10 seconds, which was an improvement from the 11
seconds observed with RocksDB + KIP-892.

My application is bottlenecked mostly by serialization and deserialization,
so improving the performance of the state store doesn't really impact our
throughput that much. And the processing performance (benchmark time,
tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at
larger state store sizes, RocksDB performance begins to degrade, so that
might not be true once we pass 20GB per partition.

-- QUESTION: Because we observed a significant (30% or so) and reproducible
slowdown during restoration, it seems like KIP-892 uses the checkpointing
behavior during restoration as well? If so, I would argue that this might
not be necessary, because everything we write is already committed, so we
don't need to change the behavior during restoration or standby tasks.
Perhaps we could write the offsets to RocksDB on every batch (or even every
5 seconds or so).

-- Note: This was a very small-scale test, with <1GB of state (as I didn't
have time to spend hours building up state). In the past I have noted that
RocksDB performance degrades significantly after 25GB of state in one
store. Future work involves determining the performance impact of KIP-892
relative to trunk at larger scale, since it's possible that the relative
behaviors are far different (i.e. relative to trunk, 892's processing and
restoration throughput might be much better or much worse).

-- Note: For those who want to replicate the tests, you can find the branch
of our streams app here:
https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks
. The example I ran was `examples/hundred-tasks`, and I ran the server with
`./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file has a
detailed breakdown of the testing.

Anyways, I'm super excited about this KIP and if a bit more future testing
goes well, we plan to ship our product with a build of KIP-892, Speedb OSS,
and potentially a few other minor tweaks that we are thinking about.

Thanks Nick!

Ride well,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Aug 24, 2023 at 3:19 AM Nick Telford <ni...@gmail.com> wrote:

> Hi Bruno,
>
> Thanks for taking the time to review the KIP. I'm back from leave now and
> intend to move this forwards as quickly as I can.
>
> Addressing your points:
>
> 1.
> Because flush() is part of the StateStore API, it's exposed to custom
> Processors, which might be making calls to flush(). This was actually the
> case in a few integration tests.
> To maintain as much compatibility as possible, I'd prefer not to make this
> an UnsupportedOperationException, as it will cause previously working
> Processors to start throwing exceptions at runtime.
> I agree that it doesn't make sense for it to proxy commit(), though, as
> that would cause it to violate the "StateStores commit only when the Task
> commits" rule.
> Instead, I think we should make this a no-op. That way, existing user
> Processors will continue to work as-before, without violation of store
> consistency that would be caused by premature flush/commit of StateStore
> data to disk.
> What do you think?
>
> 2.
> As stated in the JavaDoc, when a StateStore implementation is
> transactional, but is unable to estimate the uncommitted memory usage, the
> method will return -1.
> The intention here is to permit third-party implementations that may not be
> able to estimate memory usage.
>
> Yes, it will be 0 when nothing has been written to the store yet. I thought
> that was implied by "This method will return an approximation of the memory
> would be freed by the next call to {@link #commit(Map)}" and "@return The
> approximate size of all records awaiting {@link #commit(Map)}", however, I
> can add it explicitly to the JavaDoc if you think this is unclear?
>
> 3.
> I realise this is probably the most contentious point in my design, and I'm
> open to changing it if I'm unable to convince you of the benefits.
> Nevertheless, here's my argument:
> The Interactive Query (IQ) API(s) are directly provided StateStores to
> query, and it may be important for users to programmatically know which
> mode the StateStore is operating under. If we simply provide an
> "eosEnabled" boolean (as used throughout the internal streams engine), or
> similar, then users will need to understand the operation and consequences
> of each available processing mode and how it pertains to their StateStore.
>
> Interactive Query users aren't the only people that care about the
> processing.mode/IsolationLevel of a StateStore: implementers of custom
> StateStores also need to understand the behaviour expected of their
> implementation. KIP-892 introduces some assumptions into the Streams Engine
> about how StateStores operate under each processing mode, and it's
> important that custom implementations adhere to those assumptions in order
> to maintain the consistency guarantees.
>
> IsolationLevels provide a high-level contract on the behaviour of the
> StateStore: a user knows that under READ_COMMITTED, they will see writes
> only after the Task has committed, and under READ_UNCOMMITTED they will see
> writes immediately. No understanding of the details of each processing.mode
> is required, either for IQ users or StateStore implementers.
>
> An argument can be made that these contractual guarantees can simply be
> documented for the processing.mode (i.e. that exactly-once and
> exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves like
> READ_UNCOMMITTED), but there are several small issues with this I'd prefer
> to avoid:
>
>    - Where would we document these contracts, in a way that is difficult
>    for users/implementers to miss/ignore?
>    - It's not clear to users that the processing mode is communicating
>    an expectation of read isolation, unless they read the documentation.
> Users
>    rarely consult documentation unless they feel they need to, so it's
> likely
>    this detail would get missed by many users.
>    - It tightly couples processing modes to read isolation. Adding new
>    processing modes, or changing the read isolation of existing processing
>    modes would be difficult/impossible.
>
> Ultimately, the cost of introducing IsolationLevels is just a single
> method, since we re-use the existing IsolationLevel enum from Kafka. This
> gives us a clear place to document the contractual guarantees expected
> of/provided by StateStores, that is accessible both by the StateStore
> itself, and by IQ users.
>
> (Writing this I've just realised that the StateStore and IQ APIs actually
> don't provide access to StateStoreContext that IQ users would have direct
> access to... Perhaps StateStore should expose isolationLevel() itself too?)
>
> 4.
> Yeah, I'm not comfortable renaming the metrics in-place either, as it's a
> backwards incompatible change. My concern is that, if we leave the existing
> "flush" metrics in place, they will be confusing to users. Right now,
> "flush" metrics record explicit flushes to disk, but under KIP-892, even a
> commit() will not explicitly flush data to disk - RocksDB will decide on
> when to flush memtables to disk itself.
>
> If we keep the existing "flush" metrics, we'd have two options, which both
> seem pretty bad to me:
>
>    1. Have them record calls to commit(), which would be misleading, as
>    data is no longer explicitly "flushed" to disk by this call.
>    2. Have them record nothing at all, which is equivalent to removing the
>    metrics, except that users will see the metric still exists and so
> assume
>    that the metric is correct, and that there's a problem with their system
>    when there isn't.
>
> I agree that removing them is also a bad solution, and I'd like some
> guidance on the best path forward here.
>
> 5.
> Position files are updated on every write to a StateStore. Since our writes
> are now buffered until commit(), we can't update the Position file until
> commit() has been called, otherwise it would be inconsistent with the data
> in the event of a rollback. Consequently, we need to manage these offsets
> the same way we manage the checkpoint offsets, and ensure they're only
> written on commit().
>
> 6.
> Agreed, although I'm not exactly sure yet what tests to write. How explicit
> do we need to be here in the KIP?
>
> As for upgrade/downgrade: upgrade is designed to be seamless, and we should
> definitely add some tests around that. Downgrade, it transpires, isn't
> currently possible, as the extra column family for offset storage is
> incompatible with the pre-KIP-892 implementation: when you open a RocksDB
> database, you must open all available column families or receive an error.
> What currently happens on downgrade is that it attempts to open the store,
> throws an error about the offsets column family not being opened, which
> triggers a wipe and rebuild of the Task. Given that downgrades should be
> uncommon, I think this is acceptable behaviour, as the end-state is
> consistent, even if it results in an undesirable state restore.
>
> Should I document the upgrade/downgrade behaviour explicitly in the KIP?
>
> --
>
> Regards,
> Nick
>
>
> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <ca...@apache.org> wrote:
>
> > Hi Nick!
> >
> > Thanks for the updates!
> >
> > 1.
> > Why does StateStore#flush() default to
> > StateStore#commit(Collections.emptyMap())?
> > Since calls to flush() will not exist anymore after this KIP is
> > released, I would rather throw an unsupported operation exception by
> > default.
> >
> >
> > 2.
> > When would a state store return -1 from
> > StateStore#approximateNumUncommittedBytes() while being transactional?
> >
> > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
> > the state store is transactional but nothing has been written to the
> > state store yet?
> >
> >
> > 3.
> > Sorry for bringing this up again. Does this KIP really need to introduce
> > StateStoreContext#isolationLevel()? StateStoreContext has already
> > appConfigs() which basically exposes the same information, i.e., if EOS
> > is enabled or not.
> > In one of your previous e-mails you wrote:
> >
> > "My idea was to try to keep the StateStore interface as loosely coupled
> > from the Streams engine as possible, to give implementers more freedom,
> > and reduce the amount of internal knowledge required."
> >
> > While I understand the intent, I doubt that it decreases the coupling of
> > a StateStore interface and the Streams engine. READ_COMMITTED only
> > applies to IQ but not to reads by processors. Thus, implementers need to
> > understand how Streams accesses the state stores.
> >
> > I would like to hear what others think about this.
> >
> >
> > 4.
> > Great exposing new metrics for transactional state stores! However, I
> > would prefer to add new metrics and deprecate (in the docs) the old
> > ones. You can find examples of deprecated metrics here:
> > https://kafka.apache.org/documentation/#selector_monitoring
> >
> >
> > 5.
> > Why does the KIP mention position files? I do not think they are related
> > to transactions or flushes.
> >
> >
> > 6.
> > I think we will also need to adapt/add integration tests besides unit
> > tests. Additionally, we probably need integration or system tests to
> > verify that upgrades and downgrades between transactional and
> > non-transactional state stores work as expected.
> >
> >
> > Best,
> > Bruno
> >
> >
> >
> >
> >
> > On 7/21/23 10:34 PM, Nick Telford wrote:
> > > One more thing: I noted John's suggestion in the KIP, under "Rejected
> > > Alternatives". I still think it's an idea worth pursuing, but I believe
> > > that it's out of the scope of this KIP, because it solves a different
> set
> > > of problems to this KIP, and the scope of this one has already grown
> > quite
> > > large!
> > >
> > > On Fri, 21 Jul 2023 at 21:33, Nick Telford <ni...@gmail.com>
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I've updated the KIP (
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > )
> > >> with the latest changes; mostly bringing back "Atomic Checkpointing"
> > (for
> > >> what feels like the 10th time!). I think the one thing missing is some
> > >> changes to metrics (notably the store "flush" metrics will need to be
> > >> renamed to "commit").
> > >>
> > >> The reason I brought back Atomic Checkpointing was to decouple store
> > flush
> > >> from store commit. This is important, because with Transactional
> > >> StateStores, we now need to call "flush" on *every* Task commit, and
> not
> > >> just when the StateStore is closing, otherwise our transaction buffer
> > will
> > >> never be written and persisted, instead growing unbounded! I
> > experimented
> > >> with some simple solutions, like forcing a store flush whenever the
> > >> transaction buffer was likely to exceed its configured size, but this
> > was
> > >> brittle: it prevented the transaction buffer from being configured to
> be
> > >> unbounded, and it still would have required explicit flushes of
> RocksDB,
> > >> yielding sub-optimal performance and memory utilization.
> > >>
> > >> I deemed Atomic Checkpointing to be the "right" way to resolve this
> > >> problem. By ensuring that the changelog offsets that correspond to the
> > most
> > >> recently written records are always atomically written to the
> StateStore
> > >> (by writing them to the same transaction buffer), we can avoid
> forcibly
> > >> flushing the RocksDB memtables to disk, letting RocksDB flush them
> only
> > >> when necessary, without losing any of our consistency guarantees. See
> > the
> > >> updated KIP for more info.
> > >>
> > >> I have fully implemented these changes, although I'm still not
> entirely
> > >> happy with the implementation for segmented StateStores, so I plan to
> > >> refactor that. Despite that, all tests pass. If you'd like to try out
> or
> > >> review this highly experimental and incomplete branch, it's available
> > here:
> > >> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's
> > built
> > >> against Kafka 3.5.0 so that I had a stable base to build and test it
> on,
> > >> and to enable easy apples-to-apples comparisons in a live
> environment. I
> > >> plan to rebase it against trunk once it's nearer completion and has
> been
> > >> proven on our main application.
> > >>
> > >> I would really appreciate help in reviewing and testing:
> > >> - Segmented (Versioned, Session and Window) stores
> > >> - Global stores
> > >>
> > >> As I do not currently use either of these, so my primary test
> > environment
> > >> doesn't test these areas.
> > >>
> > >> I'm going on Parental Leave starting next week for a few weeks, so
> will
> > >> not have time to move this forward until late August. That said, your
> > >> feedback is welcome and appreciated, I just won't be able to respond
> as
> > >> quickly as usual.
> > >>
> > >> Regards,
> > >> Nick
> > >>
> > >> On Mon, 3 Jul 2023 at 16:23, Nick Telford <ni...@gmail.com>
> > wrote:
> > >>
> > >>> Hi Bruno
> > >>>
> > >>> Yes, that's correct, although the impact on IQ is not something I had
> > >>> considered.
> > >>>
> > >>> What about atomically updating the state store from the transaction
> > >>>> buffer every commit interval and writing the checkpoint (thus,
> > flushing
> > >>>> the memtable) every configured amount of data and/or number of
> commit
> > >>>> intervals?
> > >>>>
> > >>>
> > >>> I'm not quite sure I follow. Are you suggesting that we add an
> > additional
> > >>> config for the max number of commit intervals between checkpoints?
> That
> > >>> way, we would checkpoint *either* when the transaction buffers are
> > nearly
> > >>> full, *OR* whenever a certain number of commit intervals have
> elapsed,
> > >>> whichever comes first?
> > >>>
> > >>> That certainly seems reasonable, although this re-ignites an earlier
> > >>> debate about whether a config should be measured in "number of commit
> > >>> intervals", instead of just an absolute time.
> > >>>
> > >>> FWIW, I realised that this issue is the reason I was pursuing the
> > Atomic
> > >>> Checkpoints, as it de-couples memtable flush from checkpointing,
> which
> > >>> enables us to just checkpoint on every commit without any performance
> > >>> impact. Atomic Checkpointing is definitely the "best" solution, but
> > I'm not
> > >>> sure if this is enough to bring it back into this KIP.
> > >>>
> > >>> I'm currently working on moving all the transactional logic directly
> > into
> > >>> RocksDBStore itself, which does away with the
> StateStore#newTransaction
> > >>> method, and reduces the number of new classes introduced,
> significantly
> > >>> reducing the complexity. If it works, and the complexity is
> drastically
> > >>> reduced, I may try bringing back Atomic Checkpoints into this KIP.
> > >>>
> > >>> Regards,
> > >>> Nick
> > >>>
> > >>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <ca...@apache.org>
> wrote:
> > >>>
> > >>>> Hi Nick,
> > >>>>
> > >>>> Thanks for the insights! Very interesting!
> > >>>>
> > >>>> As far as I understand, you want to atomically update the state
> store
> > >>>> from the transaction buffer, flush the memtable of a state store and
> > >>>> write the checkpoint not after the commit time elapsed but after the
> > >>>> transaction buffer reached a size that would lead to exceeding
> > >>>> statestore.transaction.buffer.max.bytes before the next commit
> > interval
> > >>>> ends.
> > >>>> That means, the Kafka transaction would commit every commit interval
> > but
> > >>>> the state store will only be atomically updated roughly every
> > >>>> statestore.transaction.buffer.max.bytes of data. Also IQ would then
> > only
> > >>>> see new data roughly every statestore.transaction.buffer.max.bytes.
> > >>>> After a failure the state store needs to restore up to
> > >>>> statestore.transaction.buffer.max.bytes.
> > >>>>
> > >>>> Is this correct?
> > >>>>
> > >>>> What about atomically updating the state store from the transaction
> > >>>> buffer every commit interval and writing the checkpoint (thus,
> > flushing
> > >>>> the memtable) every configured amount of data and/or number of
> commit
> > >>>> intervals? In such a way, we would have the same delay for records
> > >>>> appearing in output topics and IQ because both would appear when the
> > >>>> Kafka transaction is committed. However, after a failure the state
> > store
> > >>>> still needs to restore up to statestore.transaction.buffer.max.bytes
> > and
> > >>>> it might restore data that is already in the state store because the
> > >>>> checkpoint lags behind the last stable offset (i.e. the last
> committed
> > >>>> offset) of the changelog topics. Restoring data that is already in
> the
> > >>>> state store is idempotent, so eos should not violated.
> > >>>> This solution needs at least one new config to specify when a
> > checkpoint
> > >>>> should be written.
> > >>>>
> > >>>>
> > >>>>
> > >>>> A small correction to your previous e-mail that does not change
> > anything
> > >>>> you said: Under alos the default commit interval is 30 seconds, not
> > five
> > >>>> seconds.
> > >>>>
> > >>>>
> > >>>> Best,
> > >>>> Bruno
> > >>>>
> > >>>>
> > >>>> On 01.07.23 12:37, Nick Telford wrote:
> > >>>>> Hi everyone,
> > >>>>>
> > >>>>> I've begun performance testing my branch on our staging
> environment,
> > >>>>> putting it through its paces in our non-trivial application. I'm
> > >>>> already
> > >>>>> observing the same increased flush rate that we saw the last time
> we
> > >>>>> attempted to use a version of this KIP, but this time, I think I
> know
> > >>>> why.
> > >>>>>
> > >>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end of
> the
> > >>>> Task
> > >>>>> commit process, has the following behaviour:
> > >>>>>
> > >>>>>      - Under ALOS: checkpoint the state stores. This includes
> > >>>>>      flushing memtables in RocksDB. This is acceptable because the
> > >>>> default
> > >>>>>      commit.interval.ms is 5 seconds, so forcibly flushing
> memtables
> > >>>> every 5
> > >>>>>      seconds is acceptable for most applications.
> > >>>>>      - Under EOS: checkpointing is not done, *unless* it's being
> > >>>> forced, due
> > >>>>>      to e.g. the Task closing or being revoked. This means that
> under
> > >>>> normal
> > >>>>>      processing conditions, the state stores will not be
> > checkpointed,
> > >>>> and will
> > >>>>>      not have memtables flushed at all , unless RocksDB decides to
> > >>>> flush them on
> > >>>>>      its own. Checkpointing stores and force-flushing their
> memtables
> > >>>> is only
> > >>>>>      done when a Task is being closed.
> > >>>>>
> > >>>>> Under EOS, KIP-892 needs to checkpoint stores on at least *some*
> > normal
> > >>>>> Task commits, in order to write the RocksDB transaction buffers to
> > the
> > >>>>> state stores, and to ensure the offsets are synced to disk to
> prevent
> > >>>>> restores from getting out of hand. Consequently, my current
> > >>>> implementation
> > >>>>> calls maybeCheckpoint on *every* Task commit, which is far too
> > >>>> frequent.
> > >>>>> This causes checkpoints every 10,000 records, which is a change in
> > >>>> flush
> > >>>>> behaviour, potentially causing performance problems for some
> > >>>> applications.
> > >>>>>
> > >>>>> I'm looking into possible solutions, and I'm currently leaning
> > towards
> > >>>>> using the statestore.transaction.buffer.max.bytes configuration to
> > >>>>> checkpoint Tasks once we are likely to exceed it. This would
> > >>>> complement the
> > >>>>> existing "early Task commit" functionality that this configuration
> > >>>>> provides, in the following way:
> > >>>>>
> > >>>>>      - Currently, we use statestore.transaction.buffer.max.bytes to
> > >>>> force an
> > >>>>>      early Task commit if processing more records would cause our
> > state
> > >>>> store
> > >>>>>      transactions to exceed the memory assigned to them.
> > >>>>>      - New functionality: when a Task *does* commit, we will not
> > >>>> checkpoint
> > >>>>>      the stores (and hence flush the transaction buffers) unless we
> > >>>> expect to
> > >>>>>      cross the statestore.transaction.buffer.max.bytes threshold
> > before
> > >>>> the next
> > >>>>>      commit
> > >>>>>
> > >>>>> I'm also open to suggestions.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Nick
> > >>>>>
> > >>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telford@gmail.com
> >
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Bruno!
> > >>>>>>
> > >>>>>> 3.
> > >>>>>> By "less predictable for users", I meant in terms of understanding
> > the
> > >>>>>> performance profile under various circumstances. The more complex
> > the
> > >>>>>> solution, the more difficult it would be for users to understand
> the
> > >>>>>> performance they see. For example, spilling records to disk when
> the
> > >>>>>> transaction buffer reaches a threshold would, I expect, reduce
> write
> > >>>>>> throughput. This reduction in write throughput could be
> unexpected,
> > >>>> and
> > >>>>>> potentially difficult to diagnose/understand for users.
> > >>>>>> At the moment, I think the "early commit" concept is relatively
> > >>>>>> straightforward; it's easy to document, and conceptually fairly
> > >>>> obvious to
> > >>>>>> users. We could probably add a metric to make it easier to
> > understand
> > >>>> when
> > >>>>>> it happens though.
> > >>>>>>
> > >>>>>> 3. (the second one)
> > >>>>>> The IsolationLevel is *essentially* an indirect way of telling
> > >>>> StateStores
> > >>>>>> whether they should be transactional. READ_COMMITTED essentially
> > >>>> requires
> > >>>>>> transactions, because it dictates that two threads calling
> > >>>>>> `newTransaction()` should not see writes from the other
> transaction
> > >>>> until
> > >>>>>> they have been committed. With READ_UNCOMMITTED, all bets are off,
> > and
> > >>>>>> stores can allow threads to observe written records at any time,
> > >>>> which is
> > >>>>>> essentially "no transactions". That said, StateStores are free to
> > >>>> implement
> > >>>>>> these guarantees however they can, which is a bit more relaxed
> than
> > >>>>>> dictating "you must use transactions". For example, with RocksDB
> we
> > >>>> would
> > >>>>>> implement these as READ_COMMITTED == WBWI-based "transactions",
> > >>>>>> READ_UNCOMMITTED == direct writes to the database. But with other
> > >>>> storage
> > >>>>>> engines, it might be preferable to *always* use transactions, even
> > >>>> when
> > >>>>>> unnecessary; or there may be storage engines that don't provide
> > >>>>>> transactions, but the isolation guarantees can be met using a
> > >>>> different
> > >>>>>> technique.
> > >>>>>> My idea was to try to keep the StateStore interface as loosely
> > coupled
> > >>>>>> from the Streams engine as possible, to give implementers more
> > >>>> freedom, and
> > >>>>>> reduce the amount of internal knowledge required.
> > >>>>>> That said, I understand that "IsolationLevel" might not be the
> right
> > >>>>>> abstraction, and we can always make it much more explicit if
> > >>>> required, e.g.
> > >>>>>> boolean transactional()
> > >>>>>>
> > >>>>>> 7-8.
> > >>>>>> I can make these changes either later today or tomorrow.
> > >>>>>>
> > >>>>>> Small update:
> > >>>>>> I've rebased my branch on trunk and fixed a bunch of issues that
> > >>>> needed
> > >>>>>> addressing. Currently, all the tests pass, which is promising, but
> > it
> > >>>> will
> > >>>>>> need to undergo some performance testing. I haven't (yet) worked
> on
> > >>>>>> removing the `newTransaction()` stuff, but I would expect that,
> > >>>>>> behaviourally, it should make no difference. The branch is
> available
> > >>>> at
> > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is
> > >>>>>> interested in taking an early look.
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Nick
> > >>>>>>
> > >>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <ca...@apache.org>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Nick,
> > >>>>>>>
> > >>>>>>> 1.
> > >>>>>>> Yeah, I agree with you. That was actually also my point. I
> > understood
> > >>>>>>> that John was proposing the ingestion path as a way to avoid the
> > >>>> early
> > >>>>>>> commits. Probably, I misinterpreted the intent.
> > >>>>>>>
> > >>>>>>> 2.
> > >>>>>>> I agree with John here, that actually it is public API. My
> question
> > >>>> is
> > >>>>>>> how this usage pattern affects normal processing.
> > >>>>>>>
> > >>>>>>> 3.
> > >>>>>>> My concern is that checking for the size of the transaction
> buffer
> > >>>> and
> > >>>>>>> maybe triggering an early commit affects the whole processing of
> > >>>> Kafka
> > >>>>>>> Streams. The transactionality of a state store is not confined to
> > the
> > >>>>>>> state store itself, but spills over and changes the behavior of
> > other
> > >>>>>>> parts of the system. I agree with you that it is a decent
> > >>>> compromise. I
> > >>>>>>> just wanted to analyse the downsides and list the options to
> > overcome
> > >>>>>>> them. I also agree with you that all options seem quite heavy
> > >>>> compared
> > >>>>>>> with your KIP. I do not understand what you mean with "less
> > >>>> predictable
> > >>>>>>> for users", though.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> I found the discussions about the alternatives really
> interesting.
> > >>>> But I
> > >>>>>>> also think that your plan sounds good and we should continue with
> > it!
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Some comments on your reply to my e-mail on June 20th:
> > >>>>>>>
> > >>>>>>> 3.
> > >>>>>>> Ah, now, I understand the reasoning behind putting isolation
> level
> > in
> > >>>>>>> the state store context. Thanks! Should that also be a way to
> give
> > >>>> the
> > >>>>>>> the state store the opportunity to decide whether to turn on
> > >>>>>>> transactions or not?
> > >>>>>>> With my comment, I was more concerned about how do you know if a
> > >>>>>>> checkpoint file needs to be written under EOS, if you do not
> have a
> > >>>> way
> > >>>>>>> to know if the state store is transactional or not. If a state
> > store
> > >>>> is
> > >>>>>>> transactional, the checkpoint file can be written during normal
> > >>>>>>> processing under EOS. If the state store is not transactional,
> the
> > >>>>>>> checkpoint file must not be written under EOS.
> > >>>>>>>
> > >>>>>>> 7.
> > >>>>>>> My point was about not only considering the bytes in memory in
> > config
> > >>>>>>> statestore.uncommitted.max.bytes, but also bytes that might be
> > >>>> spilled
> > >>>>>>> on disk. Basically, I was wondering whether you should remove the
> > >>>>>>> "memory" in "Maximum number of memory bytes to be used to
> > >>>>>>> buffer uncommitted state-store records." My thinking was that
> even
> > >>>> if a
> > >>>>>>> state store spills uncommitted bytes to disk, limiting the
> overall
> > >>>> bytes
> > >>>>>>> might make sense. Thinking about it again and considering the
> > recent
> > >>>>>>> discussions, it does not make too much sense anymore.
> > >>>>>>> I like the name statestore.transaction.buffer.max.bytes that you
> > >>>> proposed.
> > >>>>>>>
> > >>>>>>> 8.
> > >>>>>>> A high-level description (without implementation details) of how
> > >>>> Kafka
> > >>>>>>> Streams will manage the commit of changelog transactions, state
> > store
> > >>>>>>> transactions and checkpointing would be great. Would be great if
> > you
> > >>>>>>> could also add some sentences about the behavior in case of a
> > >>>> failure.
> > >>>>>>> For instance how does a transactional state store recover after a
> > >>>>>>> failure or what happens with the transaction buffer, etc. (that
> is
> > >>>> what
> > >>>>>>> I meant by "fail-over" in point 9.)
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Bruno
> > >>>>>>>
> > >>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > >>>>>>>> Hi Bruno,
> > >>>>>>>>
> > >>>>>>>> 1.
> > >>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex
> > >>>> transactions
> > >>>>>>>> have, whereby exceeding (or likely to exceed) configured memory
> > >>>> needs to
> > >>>>>>>> trigger an early commit?
> > >>>>>>>>
> > >>>>>>>> 2.
> > >>>>>>>> This is one of my big concerns. Ultimately, any approach based
> on
> > >>>>>>> cracking
> > >>>>>>>> open RocksDB internals and using it in ways it's not really
> > designed
> > >>>>>>> for is
> > >>>>>>>> likely to have some unforseen performance or consistency issues.
> > >>>>>>>>
> > >>>>>>>> 3.
> > >>>>>>>> What's your motivation for removing these early commits? While
> not
> > >>>>>>> ideal, I
> > >>>>>>>> think they're a decent compromise to ensure consistency whilst
> > >>>>>>> maintaining
> > >>>>>>>> good and predictable performance.
> > >>>>>>>> All 3 of your suggested ideas seem *very* complicated, and might
> > >>>>>>> actually
> > >>>>>>>> make behaviour less predictable for users as a consequence.
> > >>>>>>>>
> > >>>>>>>> I'm a bit concerned that the scope of this KIP is growing a bit
> > out
> > >>>> of
> > >>>>>>>> control. While it's good to discuss ideas for future
> > improvements, I
> > >>>>>>> think
> > >>>>>>>> it's important to narrow the scope down to a design that
> achieves
> > >>>> the
> > >>>>>>> most
> > >>>>>>>> pressing objectives (constant sized restorations during dirty
> > >>>>>>>> close/unexpected errors). Any design that this KIP produces can
> > >>>>>>> ultimately
> > >>>>>>>> be changed in the future, especially if the bulk of it is
> internal
> > >>>>>>>> behaviour.
> > >>>>>>>>
> > >>>>>>>> I'm going to spend some time next week trying to re-work the
> > >>>> original
> > >>>>>>>> WriteBatchWithIndex design to remove the newTransaction()
> method,
> > >>>> such
> > >>>>>>> that
> > >>>>>>>> it's just an implementation detail of RocksDBStore. That way, if
> > we
> > >>>>>>> want to
> > >>>>>>>> replace WBWI with something in the future, like the SST file
> > >>>> management
> > >>>>>>>> outlined by John, then we can do so with little/no API changes.
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>>
> > >>>>>>>> Nick
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >
> >
>

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

Posted by Colt McNealy <co...@littlehorse.io>.
Nick,

Thanks for the response.

>Can you clarify how much state was restored in those 11 seconds?
That was a full restoration of ~650MB of state after I wiped the state
directory. The restoration after a crash with your branch is nearly
instantaneous, whereas with plain Kafka 3.5.0 a crash triggers a full
restoration (8 seconds).

Additionally, I pulled, rebuilt, and re-tested your changes and now the
restoration time with your branch is the same as with vanilla Streams!
Fantastic work!

I plan to do some more testing with larger state stores over the next
couple weeks, both with RocksDB and Speedb OSS. And perhaps I might even
try enabling some of the experimental Speedb OSS features, such as the
[Improved Write Flow](https://docs.speedb.io/speedb-features/write-flow).
As far as I understand, this isn't possible to do through the standard
RocksDBConfigSetter since some of the config options are Speedb-specific.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Sep 11, 2023 at 4:29 AM Nick Telford <ni...@gmail.com> wrote:

> Hi Colt,
>
> Thanks for taking the time to run your benchmarks on this, that's
> incredibly helpful.
>
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (!!!!). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
>
> Can you clarify how much state was restored in those 11 seconds? Was this
> the time to do the full restore regardless, or was it the time to only
> restore a small fraction of the state (e.g. the last aborted transaction)?
>
> > -- QUESTION: Because we observed a significant (30% or so) and
> reproducible
> > slowdown during restoration, it seems like KIP-892 uses the checkpointing
> > behavior during restoration as well? If so, I would argue that this might
> > not be necessary, because everything we write is already committed, so we
> > don't need to change the behavior during restoration or standby tasks.
> > Perhaps we could write the offsets to RocksDB on every batch (or even
> every
> > 5 seconds or so).
>
> Restore has always used a completely separate code-path to regular writes,
> and continues to do so. I had a quick pass over the code and I suspect I
> know what's causing the performance degradation: for every restored record,
> I was adding the changelog offset of that record to the batch along with
> the record. This is different to the regular write-path, which only adds
> the current offsets once, on-commit. This writeOffset method is fairly
> expensive, since it has to serialize the TopicPartition and offset that's
> being written to the database.
>
> Assuming this is the cause, I've already pushed a fix to my branch that
> will only call writeOffset once per-batch, and also adds some caching to
> the serialization in writeOffset, that should also enhance performance of
> state commit in the normal write-path.
>
> Please let me know if this addresses the issue!
>
> Regards,
> Nick
>
>
> On Mon, 11 Sept 2023 at 05:38, Colt McNealy <co...@littlehorse.io> wrote:
>
> > Howdy folks,
> >
> > First I wanted to say fantastic work and thank you to Nick. I built your
> > branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
> > some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
> > branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1.
> And
> > it worked! Including the global store (we don't have any segmented
> stores,
> > unfortunately).
> >
> > The test I ran involved running 3,000 workflows with 100 tasks each, and
> > roughly 650MB state total.
> >
> > With Streams 3.5.0, I indeed verified that unclean shutdown caused a
> fresh
> > restore from scratch. I also benchmarked my application at:
> > - Running the benchmark took 211 seconds
> > - 1,421 tasks per second on one partition
> > - 8 seconds to restore the state (650MB or so)
> >
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (!!!!). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
> >
> > I ran the restorations many times to ensure that there was no rounding
> > error or noise; the results were remarkably consistent. Additionally, I
> ran
> > the restorations with KIP-892 built with Speedb OSS. The restoration time
> > consistently came out as 10 seconds, which was an improvement from the 11
> > seconds observed with RocksDB + KIP-892.
> >
> > My application is bottlenecked mostly by serialization and
> deserialization,
> > so improving the performance of the state store doesn't really impact our
> > throughput that much. And the processing performance (benchmark time,
> > tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at
> > larger state store sizes, RocksDB performance begins to degrade, so that
> > might not be true once we pass 20GB per partition.
> >
> > -- QUESTION: Because we observed a significant (30% or so) and
> reproducible
> > slowdown during restoration, it seems like KIP-892 uses the checkpointing
> > behavior during restoration as well? If so, I would argue that this might
> > not be necessary, because everything we write is already committed, so we
> > don't need to change the behavior during restoration or standby tasks.
> > Perhaps we could write the offsets to RocksDB on every batch (or even
> every
> > 5 seconds or so).
> >
> > -- Note: This was a very small-scale test, with <1GB of state (as I
> didn't
> > have time to spend hours building up state). In the past I have noted
> that
> > RocksDB performance degrades significantly after 25GB of state in one
> > store. Future work involves determining the performance impact of KIP-892
> > relative to trunk at larger scale, since it's possible that the relative
> > behaviors are far different (i.e. relative to trunk, 892's processing and
> > restoration throughput might be much better or much worse).
> >
> > -- Note: For those who want to replicate the tests, you can find the
> branch
> > of our streams app here:
> >
> >
> https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks
> > . The example I ran was `examples/hundred-tasks`, and I ran the server
> with
> > `./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file
> has a
> > detailed breakdown of the testing.
> >
> > Anyways, I'm super excited about this KIP and if a bit more future
> testing
> > goes well, we plan to ship our product with a build of KIP-892, Speedb
> OSS,
> > and potentially a few other minor tweaks that we are thinking about.
> >
> > Thanks Nick!
> >
> > Ride well,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*
> >
> >
> > On Thu, Aug 24, 2023 at 3:19 AM Nick Telford <ni...@gmail.com>
> > wrote:
> >
> > > Hi Bruno,
> > >
> > > Thanks for taking the time to review the KIP. I'm back from leave now
> and
> > > intend to move this forwards as quickly as I can.
> > >
> > > Addressing your points:
> > >
> > > 1.
> > > Because flush() is part of the StateStore API, it's exposed to custom
> > > Processors, which might be making calls to flush(). This was actually
> the
> > > case in a few integration tests.
> > > To maintain as much compatibility as possible, I'd prefer not to make
> > this
> > > an UnsupportedOperationException, as it will cause previously working
> > > Processors to start throwing exceptions at runtime.
> > > I agree that it doesn't make sense for it to proxy commit(), though, as
> > > that would cause it to violate the "StateStores commit only when the
> Task
> > > commits" rule.
> > > Instead, I think we should make this a no-op. That way, existing user
> > > Processors will continue to work as-before, without violation of store
> > > consistency that would be caused by premature flush/commit of
> StateStore
> > > data to disk.
> > > What do you think?
> > >
> > > 2.
> > > As stated in the JavaDoc, when a StateStore implementation is
> > > transactional, but is unable to estimate the uncommitted memory usage,
> > the
> > > method will return -1.
> > > The intention here is to permit third-party implementations that may
> not
> > be
> > > able to estimate memory usage.
> > >
> > > Yes, it will be 0 when nothing has been written to the store yet. I
> > thought
> > > that was implied by "This method will return an approximation of the
> > memory
> > > would be freed by the next call to {@link #commit(Map)}" and "@return
> The
> > > approximate size of all records awaiting {@link #commit(Map)}",
> however,
> > I
> > > can add it explicitly to the JavaDoc if you think this is unclear?
> > >
> > > 3.
> > > I realise this is probably the most contentious point in my design, and
> > I'm
> > > open to changing it if I'm unable to convince you of the benefits.
> > > Nevertheless, here's my argument:
> > > The Interactive Query (IQ) API(s) are directly provided StateStores to
> > > query, and it may be important for users to programmatically know which
> > > mode the StateStore is operating under. If we simply provide an
> > > "eosEnabled" boolean (as used throughout the internal streams engine),
> or
> > > similar, then users will need to understand the operation and
> > consequences
> > > of each available processing mode and how it pertains to their
> > StateStore.
> > >
> > > Interactive Query users aren't the only people that care about the
> > > processing.mode/IsolationLevel of a StateStore: implementers of custom
> > > StateStores also need to understand the behaviour expected of their
> > > implementation. KIP-892 introduces some assumptions into the Streams
> > Engine
> > > about how StateStores operate under each processing mode, and it's
> > > important that custom implementations adhere to those assumptions in
> > order
> > > to maintain the consistency guarantees.
> > >
> > > IsolationLevels provide a high-level contract on the behaviour of the
> > > StateStore: a user knows that under READ_COMMITTED, they will see
> writes
> > > only after the Task has committed, and under READ_UNCOMMITTED they will
> > see
> > > writes immediately. No understanding of the details of each
> > processing.mode
> > > is required, either for IQ users or StateStore implementers.
> > >
> > > An argument can be made that these contractual guarantees can simply be
> > > documented for the processing.mode (i.e. that exactly-once and
> > > exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves
> like
> > > READ_UNCOMMITTED), but there are several small issues with this I'd
> > prefer
> > > to avoid:
> > >
> > >    - Where would we document these contracts, in a way that is
> difficult
> > >    for users/implementers to miss/ignore?
> > >    - It's not clear to users that the processing mode is communicating
> > >    an expectation of read isolation, unless they read the
> documentation.
> > > Users
> > >    rarely consult documentation unless they feel they need to, so it's
> > > likely
> > >    this detail would get missed by many users.
> > >    - It tightly couples processing modes to read isolation. Adding new
> > >    processing modes, or changing the read isolation of existing
> > processing
> > >    modes would be difficult/impossible.
> > >
> > > Ultimately, the cost of introducing IsolationLevels is just a single
> > > method, since we re-use the existing IsolationLevel enum from Kafka.
> This
> > > gives us a clear place to document the contractual guarantees expected
> > > of/provided by StateStores, that is accessible both by the StateStore
> > > itself, and by IQ users.
> > >
> > > (Writing this I've just realised that the StateStore and IQ APIs
> actually
> > > don't provide access to StateStoreContext that IQ users would have
> direct
> > > access to... Perhaps StateStore should expose isolationLevel() itself
> > too?)
> > >
> > > 4.
> > > Yeah, I'm not comfortable renaming the metrics in-place either, as
> it's a
> > > backwards incompatible change. My concern is that, if we leave the
> > existing
> > > "flush" metrics in place, they will be confusing to users. Right now,
> > > "flush" metrics record explicit flushes to disk, but under KIP-892,
> even
> > a
> > > commit() will not explicitly flush data to disk - RocksDB will decide
> on
> > > when to flush memtables to disk itself.
> > >
> > > If we keep the existing "flush" metrics, we'd have two options, which
> > both
> > > seem pretty bad to me:
> > >
> > >    1. Have them record calls to commit(), which would be misleading, as
> > >    data is no longer explicitly "flushed" to disk by this call.
> > >    2. Have them record nothing at all, which is equivalent to removing
> > the
> > >    metrics, except that users will see the metric still exists and so
> > > assume
> > >    that the metric is correct, and that there's a problem with their
> > system
> > >    when there isn't.
> > >
> > > I agree that removing them is also a bad solution, and I'd like some
> > > guidance on the best path forward here.
> > >
> > > 5.
> > > Position files are updated on every write to a StateStore. Since our
> > writes
> > > are now buffered until commit(), we can't update the Position file
> until
> > > commit() has been called, otherwise it would be inconsistent with the
> > data
> > > in the event of a rollback. Consequently, we need to manage these
> offsets
> > > the same way we manage the checkpoint offsets, and ensure they're only
> > > written on commit().
> > >
> > > 6.
> > > Agreed, although I'm not exactly sure yet what tests to write. How
> > explicit
> > > do we need to be here in the KIP?
> > >
> > > As for upgrade/downgrade: upgrade is designed to be seamless, and we
> > should
> > > definitely add some tests around that. Downgrade, it transpires, isn't
> > > currently possible, as the extra column family for offset storage is
> > > incompatible with the pre-KIP-892 implementation: when you open a
> RocksDB
> > > database, you must open all available column families or receive an
> > error.
> > > What currently happens on downgrade is that it attempts to open the
> > store,
> > > throws an error about the offsets column family not being opened, which
> > > triggers a wipe and rebuild of the Task. Given that downgrades should
> be
> > > uncommon, I think this is acceptable behaviour, as the end-state is
> > > consistent, even if it results in an undesirable state restore.
> > >
> > > Should I document the upgrade/downgrade behaviour explicitly in the
> KIP?
> > >
> > > --
> > >
> > > Regards,
> > > Nick
> > >
> > >
> > > On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <ca...@apache.org>
> wrote:
> > >
> > > > Hi Nick!
> > > >
> > > > Thanks for the updates!
> > > >
> > > > 1.
> > > > Why does StateStore#flush() default to
> > > > StateStore#commit(Collections.emptyMap())?
> > > > Since calls to flush() will not exist anymore after this KIP is
> > > > released, I would rather throw an unsupported operation exception by
> > > > default.
> > > >
> > > >
> > > > 2.
> > > > When would a state store return -1 from
> > > > StateStore#approximateNumUncommittedBytes() while being
> transactional?
> > > >
> > > > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
> > > > the state store is transactional but nothing has been written to the
> > > > state store yet?
> > > >
> > > >
> > > > 3.
> > > > Sorry for bringing this up again. Does this KIP really need to
> > introduce
> > > > StateStoreContext#isolationLevel()? StateStoreContext has already
> > > > appConfigs() which basically exposes the same information, i.e., if
> EOS
> > > > is enabled or not.
> > > > In one of your previous e-mails you wrote:
> > > >
> > > > "My idea was to try to keep the StateStore interface as loosely
> coupled
> > > > from the Streams engine as possible, to give implementers more
> freedom,
> > > > and reduce the amount of internal knowledge required."
> > > >
> > > > While I understand the intent, I doubt that it decreases the coupling
> > of
> > > > a StateStore interface and the Streams engine. READ_COMMITTED only
> > > > applies to IQ but not to reads by processors. Thus, implementers need
> > to
> > > > understand how Streams accesses the state stores.
> > > >
> > > > I would like to hear what others think about this.
> > > >
> > > >
> > > > 4.
> > > > Great exposing new metrics for transactional state stores! However, I
> > > > would prefer to add new metrics and deprecate (in the docs) the old
> > > > ones. You can find examples of deprecated metrics here:
> > > > https://kafka.apache.org/documentation/#selector_monitoring
> > > >
> > > >
> > > > 5.
> > > > Why does the KIP mention position files? I do not think they are
> > related
> > > > to transactions or flushes.
> > > >
> > > >
> > > > 6.
> > > > I think we will also need to adapt/add integration tests besides unit
> > > > tests. Additionally, we probably need integration or system tests to
> > > > verify that upgrades and downgrades between transactional and
> > > > non-transactional state stores work as expected.
> > > >
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 7/21/23 10:34 PM, Nick Telford wrote:
> > > > > One more thing: I noted John's suggestion in the KIP, under
> "Rejected
> > > > > Alternatives". I still think it's an idea worth pursuing, but I
> > believe
> > > > > that it's out of the scope of this KIP, because it solves a
> different
> > > set
> > > > > of problems to this KIP, and the scope of this one has already
> grown
> > > > quite
> > > > > large!
> > > > >
> > > > > On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telford@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi everyone,
> > > > >>
> > > > >> I've updated the KIP (
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > )
> > > > >> with the latest changes; mostly bringing back "Atomic
> Checkpointing"
> > > > (for
> > > > >> what feels like the 10th time!). I think the one thing missing is
> > some
> > > > >> changes to metrics (notably the store "flush" metrics will need to
> > be
> > > > >> renamed to "commit").
> > > > >>
> > > > >> The reason I brought back Atomic Checkpointing was to decouple
> store
> > > > flush
> > > > >> from store commit. This is important, because with Transactional
> > > > >> StateStores, we now need to call "flush" on *every* Task commit,
> and
> > > not
> > > > >> just when the StateStore is closing, otherwise our transaction
> > buffer
> > > > will
> > > > >> never be written and persisted, instead growing unbounded! I
> > > > experimented
> > > > >> with some simple solutions, like forcing a store flush whenever
> the
> > > > >> transaction buffer was likely to exceed its configured size, but
> > this
> > > > was
> > > > >> brittle: it prevented the transaction buffer from being configured
> > to
> > > be
> > > > >> unbounded, and it still would have required explicit flushes of
> > > RocksDB,
> > > > >> yielding sub-optimal performance and memory utilization.
> > > > >>
> > > > >> I deemed Atomic Checkpointing to be the "right" way to resolve
> this
> > > > >> problem. By ensuring that the changelog offsets that correspond to
> > the
> > > > most
> > > > >> recently written records are always atomically written to the
> > > StateStore
> > > > >> (by writing them to the same transaction buffer), we can avoid
> > > forcibly
> > > > >> flushing the RocksDB memtables to disk, letting RocksDB flush them
> > > only
> > > > >> when necessary, without losing any of our consistency guarantees.
> > See
> > > > the
> > > > >> updated KIP for more info.
> > > > >>
> > > > >> I have fully implemented these changes, although I'm still not
> > > entirely
> > > > >> happy with the implementation for segmented StateStores, so I plan
> > to
> > > > >> refactor that. Despite that, all tests pass. If you'd like to try
> > out
> > > or
> > > > >> review this highly experimental and incomplete branch, it's
> > available
> > > > here:
> > > > >> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note:
> it's
> > > > built
> > > > >> against Kafka 3.5.0 so that I had a stable base to build and test
> it
> > > on,
> > > > >> and to enable easy apples-to-apples comparisons in a live
> > > environment. I
> > > > >> plan to rebase it against trunk once it's nearer completion and
> has
> > > been
> > > > >> proven on our main application.
> > > > >>
> > > > >> I would really appreciate help in reviewing and testing:
> > > > >> - Segmented (Versioned, Session and Window) stores
> > > > >> - Global stores
> > > > >>
> > > > >> As I do not currently use either of these, so my primary test
> > > > environment
> > > > >> doesn't test these areas.
> > > > >>
> > > > >> I'm going on Parental Leave starting next week for a few weeks, so
> > > will
> > > > >> not have time to move this forward until late August. That said,
> > your
> > > > >> feedback is welcome and appreciated, I just won't be able to
> respond
> > > as
> > > > >> quickly as usual.
> > > > >>
> > > > >> Regards,
> > > > >> Nick
> > > > >>
> > > > >> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telford@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >>> Hi Bruno
> > > > >>>
> > > > >>> Yes, that's correct, although the impact on IQ is not something I
> > had
> > > > >>> considered.
> > > > >>>
> > > > >>> What about atomically updating the state store from the
> transaction
> > > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > > flushing
> > > > >>>> the memtable) every configured amount of data and/or number of
> > > commit
> > > > >>>> intervals?
> > > > >>>>
> > > > >>>
> > > > >>> I'm not quite sure I follow. Are you suggesting that we add an
> > > > additional
> > > > >>> config for the max number of commit intervals between
> checkpoints?
> > > That
> > > > >>> way, we would checkpoint *either* when the transaction buffers
> are
> > > > nearly
> > > > >>> full, *OR* whenever a certain number of commit intervals have
> > > elapsed,
> > > > >>> whichever comes first?
> > > > >>>
> > > > >>> That certainly seems reasonable, although this re-ignites an
> > earlier
> > > > >>> debate about whether a config should be measured in "number of
> > commit
> > > > >>> intervals", instead of just an absolute time.
> > > > >>>
> > > > >>> FWIW, I realised that this issue is the reason I was pursuing the
> > > > Atomic
> > > > >>> Checkpoints, as it de-couples memtable flush from checkpointing,
> > > which
> > > > >>> enables us to just checkpoint on every commit without any
> > performance
> > > > >>> impact. Atomic Checkpointing is definitely the "best" solution,
> but
> > > > I'm not
> > > > >>> sure if this is enough to bring it back into this KIP.
> > > > >>>
> > > > >>> I'm currently working on moving all the transactional logic
> > directly
> > > > into
> > > > >>> RocksDBStore itself, which does away with the
> > > StateStore#newTransaction
> > > > >>> method, and reduces the number of new classes introduced,
> > > significantly
> > > > >>> reducing the complexity. If it works, and the complexity is
> > > drastically
> > > > >>> reduced, I may try bringing back Atomic Checkpoints into this
> KIP.
> > > > >>>
> > > > >>> Regards,
> > > > >>> Nick
> > > > >>>
> > > > >>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <ca...@apache.org>
> > > wrote:
> > > > >>>
> > > > >>>> Hi Nick,
> > > > >>>>
> > > > >>>> Thanks for the insights! Very interesting!
> > > > >>>>
> > > > >>>> As far as I understand, you want to atomically update the state
> > > store
> > > > >>>> from the transaction buffer, flush the memtable of a state store
> > and
> > > > >>>> write the checkpoint not after the commit time elapsed but after
> > the
> > > > >>>> transaction buffer reached a size that would lead to exceeding
> > > > >>>> statestore.transaction.buffer.max.bytes before the next commit
> > > > interval
> > > > >>>> ends.
> > > > >>>> That means, the Kafka transaction would commit every commit
> > interval
> > > > but
> > > > >>>> the state store will only be atomically updated roughly every
> > > > >>>> statestore.transaction.buffer.max.bytes of data. Also IQ would
> > then
> > > > only
> > > > >>>> see new data roughly every
> > statestore.transaction.buffer.max.bytes.
> > > > >>>> After a failure the state store needs to restore up to
> > > > >>>> statestore.transaction.buffer.max.bytes.
> > > > >>>>
> > > > >>>> Is this correct?
> > > > >>>>
> > > > >>>> What about atomically updating the state store from the
> > transaction
> > > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > > flushing
> > > > >>>> the memtable) every configured amount of data and/or number of
> > > commit
> > > > >>>> intervals? In such a way, we would have the same delay for
> records
> > > > >>>> appearing in output topics and IQ because both would appear when
> > the
> > > > >>>> Kafka transaction is committed. However, after a failure the
> state
> > > > store
> > > > >>>> still needs to restore up to
> > statestore.transaction.buffer.max.bytes
> > > > and
> > > > >>>> it might restore data that is already in the state store because
> > the
> > > > >>>> checkpoint lags behind the last stable offset (i.e. the last
> > > committed
> > > > >>>> offset) of the changelog topics. Restoring data that is already
> in
> > > the
> > > > >>>> state store is idempotent, so eos should not violated.
> > > > >>>> This solution needs at least one new config to specify when a
> > > > checkpoint
> > > > >>>> should be written.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> A small correction to your previous e-mail that does not change
> > > > anything
> > > > >>>> you said: Under alos the default commit interval is 30 seconds,
> > not
> > > > five
> > > > >>>> seconds.
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Bruno
> > > > >>>>
> > > > >>>>
> > > > >>>> On 01.07.23 12:37, Nick Telford wrote:
> > > > >>>>> Hi everyone,
> > > > >>>>>
> > > > >>>>> I've begun performance testing my branch on our staging
> > > environment,
> > > > >>>>> putting it through its paces in our non-trivial application.
> I'm
> > > > >>>> already
> > > > >>>>> observing the same increased flush rate that we saw the last
> time
> > > we
> > > > >>>>> attempted to use a version of this KIP, but this time, I think
> I
> > > know
> > > > >>>> why.
> > > > >>>>>
> > > > >>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end
> of
> > > the
> > > > >>>> Task
> > > > >>>>> commit process, has the following behaviour:
> > > > >>>>>
> > > > >>>>>      - Under ALOS: checkpoint the state stores. This includes
> > > > >>>>>      flushing memtables in RocksDB. This is acceptable because
> > the
> > > > >>>> default
> > > > >>>>>      commit.interval.ms is 5 seconds, so forcibly flushing
> > > memtables
> > > > >>>> every 5
> > > > >>>>>      seconds is acceptable for most applications.
> > > > >>>>>      - Under EOS: checkpointing is not done, *unless* it's
> being
> > > > >>>> forced, due
> > > > >>>>>      to e.g. the Task closing or being revoked. This means that
> > > under
> > > > >>>> normal
> > > > >>>>>      processing conditions, the state stores will not be
> > > > checkpointed,
> > > > >>>> and will
> > > > >>>>>      not have memtables flushed at all , unless RocksDB decides
> > to
> > > > >>>> flush them on
> > > > >>>>>      its own. Checkpointing stores and force-flushing their
> > > memtables
> > > > >>>> is only
> > > > >>>>>      done when a Task is being closed.
> > > > >>>>>
> > > > >>>>> Under EOS, KIP-892 needs to checkpoint stores on at least
> *some*
> > > > normal
> > > > >>>>> Task commits, in order to write the RocksDB transaction buffers
> > to
> > > > the
> > > > >>>>> state stores, and to ensure the offsets are synced to disk to
> > > prevent
> > > > >>>>> restores from getting out of hand. Consequently, my current
> > > > >>>> implementation
> > > > >>>>> calls maybeCheckpoint on *every* Task commit, which is far too
> > > > >>>> frequent.
> > > > >>>>> This causes checkpoints every 10,000 records, which is a change
> > in
> > > > >>>> flush
> > > > >>>>> behaviour, potentially causing performance problems for some
> > > > >>>> applications.
> > > > >>>>>
> > > > >>>>> I'm looking into possible solutions, and I'm currently leaning
> > > > towards
> > > > >>>>> using the statestore.transaction.buffer.max.bytes configuration
> > to
> > > > >>>>> checkpoint Tasks once we are likely to exceed it. This would
> > > > >>>> complement the
> > > > >>>>> existing "early Task commit" functionality that this
> > configuration
> > > > >>>>> provides, in the following way:
> > > > >>>>>
> > > > >>>>>      - Currently, we use
> statestore.transaction.buffer.max.bytes
> > to
> > > > >>>> force an
> > > > >>>>>      early Task commit if processing more records would cause
> our
> > > > state
> > > > >>>> store
> > > > >>>>>      transactions to exceed the memory assigned to them.
> > > > >>>>>      - New functionality: when a Task *does* commit, we will
> not
> > > > >>>> checkpoint
> > > > >>>>>      the stores (and hence flush the transaction buffers)
> unless
> > we
> > > > >>>> expect to
> > > > >>>>>      cross the statestore.transaction.buffer.max.bytes
> threshold
> > > > before
> > > > >>>> the next
> > > > >>>>>      commit
> > > > >>>>>
> > > > >>>>> I'm also open to suggestions.
> > > > >>>>>
> > > > >>>>> Regards,
> > > > >>>>> Nick
> > > > >>>>>
> > > > >>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> > nick.telford@gmail.com
> > > >
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Bruno!
> > > > >>>>>>
> > > > >>>>>> 3.
> > > > >>>>>> By "less predictable for users", I meant in terms of
> > understanding
> > > > the
> > > > >>>>>> performance profile under various circumstances. The more
> > complex
> > > > the
> > > > >>>>>> solution, the more difficult it would be for users to
> understand
> > > the
> > > > >>>>>> performance they see. For example, spilling records to disk
> when
> > > the
> > > > >>>>>> transaction buffer reaches a threshold would, I expect, reduce
> > > write
> > > > >>>>>> throughput. This reduction in write throughput could be
> > > unexpected,
> > > > >>>> and
> > > > >>>>>> potentially difficult to diagnose/understand for users.
> > > > >>>>>> At the moment, I think the "early commit" concept is
> relatively
> > > > >>>>>> straightforward; it's easy to document, and conceptually
> fairly
> > > > >>>> obvious to
> > > > >>>>>> users. We could probably add a metric to make it easier to
> > > > understand
> > > > >>>> when
> > > > >>>>>> it happens though.
> > > > >>>>>>
> > > > >>>>>> 3. (the second one)
> > > > >>>>>> The IsolationLevel is *essentially* an indirect way of telling
> > > > >>>> StateStores
> > > > >>>>>> whether they should be transactional. READ_COMMITTED
> essentially
> > > > >>>> requires
> > > > >>>>>> transactions, because it dictates that two threads calling
> > > > >>>>>> `newTransaction()` should not see writes from the other
> > > transaction
> > > > >>>> until
> > > > >>>>>> they have been committed. With READ_UNCOMMITTED, all bets are
> > off,
> > > > and
> > > > >>>>>> stores can allow threads to observe written records at any
> time,
> > > > >>>> which is
> > > > >>>>>> essentially "no transactions". That said, StateStores are free
> > to
> > > > >>>> implement
> > > > >>>>>> these guarantees however they can, which is a bit more relaxed
> > > than
> > > > >>>>>> dictating "you must use transactions". For example, with
> RocksDB
> > > we
> > > > >>>> would
> > > > >>>>>> implement these as READ_COMMITTED == WBWI-based
> "transactions",
> > > > >>>>>> READ_UNCOMMITTED == direct writes to the database. But with
> > other
> > > > >>>> storage
> > > > >>>>>> engines, it might be preferable to *always* use transactions,
> > even
> > > > >>>> when
> > > > >>>>>> unnecessary; or there may be storage engines that don't
> provide
> > > > >>>>>> transactions, but the isolation guarantees can be met using a
> > > > >>>> different
> > > > >>>>>> technique.
> > > > >>>>>> My idea was to try to keep the StateStore interface as loosely
> > > > coupled
> > > > >>>>>> from the Streams engine as possible, to give implementers more
> > > > >>>> freedom, and
> > > > >>>>>> reduce the amount of internal knowledge required.
> > > > >>>>>> That said, I understand that "IsolationLevel" might not be the
> > > right
> > > > >>>>>> abstraction, and we can always make it much more explicit if
> > > > >>>> required, e.g.
> > > > >>>>>> boolean transactional()
> > > > >>>>>>
> > > > >>>>>> 7-8.
> > > > >>>>>> I can make these changes either later today or tomorrow.
> > > > >>>>>>
> > > > >>>>>> Small update:
> > > > >>>>>> I've rebased my branch on trunk and fixed a bunch of issues
> that
> > > > >>>> needed
> > > > >>>>>> addressing. Currently, all the tests pass, which is promising,
> > but
> > > > it
> > > > >>>> will
> > > > >>>>>> need to undergo some performance testing. I haven't (yet)
> worked
> > > on
> > > > >>>>>> removing the `newTransaction()` stuff, but I would expect
> that,
> > > > >>>>>> behaviourally, it should make no difference. The branch is
> > > available
> > > > >>>> at
> > > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone
> > is
> > > > >>>>>> interested in taking an early look.
> > > > >>>>>>
> > > > >>>>>> Regards,
> > > > >>>>>> Nick
> > > > >>>>>>
> > > > >>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
> cadonna@apache.org
> > >
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Nick,
> > > > >>>>>>>
> > > > >>>>>>> 1.
> > > > >>>>>>> Yeah, I agree with you. That was actually also my point. I
> > > > understood
> > > > >>>>>>> that John was proposing the ingestion path as a way to avoid
> > the
> > > > >>>> early
> > > > >>>>>>> commits. Probably, I misinterpreted the intent.
> > > > >>>>>>>
> > > > >>>>>>> 2.
> > > > >>>>>>> I agree with John here, that actually it is public API. My
> > > question
> > > > >>>> is
> > > > >>>>>>> how this usage pattern affects normal processing.
> > > > >>>>>>>
> > > > >>>>>>> 3.
> > > > >>>>>>> My concern is that checking for the size of the transaction
> > > buffer
> > > > >>>> and
> > > > >>>>>>> maybe triggering an early commit affects the whole processing
> > of
> > > > >>>> Kafka
> > > > >>>>>>> Streams. The transactionality of a state store is not
> confined
> > to
> > > > the
> > > > >>>>>>> state store itself, but spills over and changes the behavior
> of
> > > > other
> > > > >>>>>>> parts of the system. I agree with you that it is a decent
> > > > >>>> compromise. I
> > > > >>>>>>> just wanted to analyse the downsides and list the options to
> > > > overcome
> > > > >>>>>>> them. I also agree with you that all options seem quite heavy
> > > > >>>> compared
> > > > >>>>>>> with your KIP. I do not understand what you mean with "less
> > > > >>>> predictable
> > > > >>>>>>> for users", though.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I found the discussions about the alternatives really
> > > interesting.
> > > > >>>> But I
> > > > >>>>>>> also think that your plan sounds good and we should continue
> > with
> > > > it!
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Some comments on your reply to my e-mail on June 20th:
> > > > >>>>>>>
> > > > >>>>>>> 3.
> > > > >>>>>>> Ah, now, I understand the reasoning behind putting isolation
> > > level
> > > > in
> > > > >>>>>>> the state store context. Thanks! Should that also be a way to
> > > give
> > > > >>>> the
> > > > >>>>>>> the state store the opportunity to decide whether to turn on
> > > > >>>>>>> transactions or not?
> > > > >>>>>>> With my comment, I was more concerned about how do you know
> if
> > a
> > > > >>>>>>> checkpoint file needs to be written under EOS, if you do not
> > > have a
> > > > >>>> way
> > > > >>>>>>> to know if the state store is transactional or not. If a
> state
> > > > store
> > > > >>>> is
> > > > >>>>>>> transactional, the checkpoint file can be written during
> normal
> > > > >>>>>>> processing under EOS. If the state store is not
> transactional,
> > > the
> > > > >>>>>>> checkpoint file must not be written under EOS.
> > > > >>>>>>>
> > > > >>>>>>> 7.
> > > > >>>>>>> My point was about not only considering the bytes in memory
> in
> > > > config
> > > > >>>>>>> statestore.uncommitted.max.bytes, but also bytes that might
> be
> > > > >>>> spilled
> > > > >>>>>>> on disk. Basically, I was wondering whether you should remove
> > the
> > > > >>>>>>> "memory" in "Maximum number of memory bytes to be used to
> > > > >>>>>>> buffer uncommitted state-store records." My thinking was that
> > > even
> > > > >>>> if a
> > > > >>>>>>> state store spills uncommitted bytes to disk, limiting the
> > > overall
> > > > >>>> bytes
> > > > >>>>>>> might make sense. Thinking about it again and considering the
> > > > recent
> > > > >>>>>>> discussions, it does not make too much sense anymore.
> > > > >>>>>>> I like the name statestore.transaction.buffer.max.bytes that
> > you
> > > > >>>> proposed.
> > > > >>>>>>>
> > > > >>>>>>> 8.
> > > > >>>>>>> A high-level description (without implementation details) of
> > how
> > > > >>>> Kafka
> > > > >>>>>>> Streams will manage the commit of changelog transactions,
> state
> > > > store
> > > > >>>>>>> transactions and checkpointing would be great. Would be great
> > if
> > > > you
> > > > >>>>>>> could also add some sentences about the behavior in case of a
> > > > >>>> failure.
> > > > >>>>>>> For instance how does a transactional state store recover
> > after a
> > > > >>>>>>> failure or what happens with the transaction buffer, etc.
> (that
> > > is
> > > > >>>> what
> > > > >>>>>>> I meant by "fail-over" in point 9.)
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Bruno
> > > > >>>>>>>
> > > > >>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > > > >>>>>>>> Hi Bruno,
> > > > >>>>>>>>
> > > > >>>>>>>> 1.
> > > > >>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex
> > > > >>>> transactions
> > > > >>>>>>>> have, whereby exceeding (or likely to exceed) configured
> > memory
> > > > >>>> needs to
> > > > >>>>>>>> trigger an early commit?
> > > > >>>>>>>>
> > > > >>>>>>>> 2.
> > > > >>>>>>>> This is one of my big concerns. Ultimately, any approach
> based
> > > on
> > > > >>>>>>> cracking
> > > > >>>>>>>> open RocksDB internals and using it in ways it's not really
> > > > designed
> > > > >>>>>>> for is
> > > > >>>>>>>> likely to have some unforseen performance or consistency
> > issues.
> > > > >>>>>>>>
> > > > >>>>>>>> 3.
> > > > >>>>>>>> What's your motivation for removing these early commits?
> While
> > > not
> > > > >>>>>>> ideal, I
> > > > >>>>>>>> think they're a decent compromise to ensure consistency
> whilst
> > > > >>>>>>> maintaining
> > > > >>>>>>>> good and predictable performance.
> > > > >>>>>>>> All 3 of your suggested ideas seem *very* complicated, and
> > might
> > > > >>>>>>> actually
> > > > >>>>>>>> make behaviour less predictable for users as a consequence.
> > > > >>>>>>>>
> > > > >>>>>>>> I'm a bit concerned that the scope of this KIP is growing a
> > bit
> > > > out
> > > > >>>> of
> > > > >>>>>>>> control. While it's good to discuss ideas for future
> > > > improvements, I
> > > > >>>>>>> think
> > > > >>>>>>>> it's important to narrow the scope down to a design that
> > > achieves
> > > > >>>> the
> > > > >>>>>>> most
> > > > >>>>>>>> pressing objectives (constant sized restorations during
> dirty
> > > > >>>>>>>> close/unexpected errors). Any design that this KIP produces
> > can
> > > > >>>>>>> ultimately
> > > > >>>>>>>> be changed in the future, especially if the bulk of it is
> > > internal
> > > > >>>>>>>> behaviour.
> > > > >>>>>>>>
> > > > >>>>>>>> I'm going to spend some time next week trying to re-work the
> > > > >>>> original
> > > > >>>>>>>> WriteBatchWithIndex design to remove the newTransaction()
> > > method,
> > > > >>>> such
> > > > >>>>>>> that
> > > > >>>>>>>> it's just an implementation detail of RocksDBStore. That
> way,
> > if
> > > > we
> > > > >>>>>>> want to
> > > > >>>>>>>> replace WBWI with something in the future, like the SST file
> > > > >>>> management
> > > > >>>>>>>> outlined by John, then we can do so with little/no API
> > changes.
> > > > >>>>>>>>
> > > > >>>>>>>> Regards,
> > > > >>>>>>>>
> > > > >>>>>>>> Nick
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

Posted by Nick Telford <ni...@gmail.com>.
Hi Colt,

Thanks for taking the time to run your benchmarks on this, that's
incredibly helpful.

> With KIP 892, I verified that unclean shutdown does not cause a fresh
> restore (!!!!). I got the following benchmark results:
> - Benchmark took 216 seconds
> - 1,401 tasks per second on one partition
> - 11 seconds to restore the state

Can you clarify how much state was restored in those 11 seconds? Was this
the time to do the full restore regardless, or was it the time to only
restore a small fraction of the state (e.g. the last aborted transaction)?

> -- QUESTION: Because we observed a significant (30% or so) and
reproducible
> slowdown during restoration, it seems like KIP-892 uses the checkpointing
> behavior during restoration as well? If so, I would argue that this might
> not be necessary, because everything we write is already committed, so we
> don't need to change the behavior during restoration or standby tasks.
> Perhaps we could write the offsets to RocksDB on every batch (or even
every
> 5 seconds or so).

Restore has always used a completely separate code-path to regular writes,
and continues to do so. I had a quick pass over the code and I suspect I
know what's causing the performance degradation: for every restored record,
I was adding the changelog offset of that record to the batch along with
the record. This is different to the regular write-path, which only adds
the current offsets once, on-commit. This writeOffset method is fairly
expensive, since it has to serialize the TopicPartition and offset that's
being written to the database.

Assuming this is the cause, I've already pushed a fix to my branch that
will only call writeOffset once per-batch, and also adds some caching to
the serialization in writeOffset, that should also enhance performance of
state commit in the normal write-path.

Please let me know if this addresses the issue!

Regards,
Nick


On Mon, 11 Sept 2023 at 05:38, Colt McNealy <co...@littlehorse.io> wrote:

> Howdy folks,
>
> First I wanted to say fantastic work and thank you to Nick. I built your
> branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
> some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
> branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1. And
> it worked! Including the global store (we don't have any segmented stores,
> unfortunately).
>
> The test I ran involved running 3,000 workflows with 100 tasks each, and
> roughly 650MB state total.
>
> With Streams 3.5.0, I indeed verified that unclean shutdown caused a fresh
> restore from scratch. I also benchmarked my application at:
> - Running the benchmark took 211 seconds
> - 1,421 tasks per second on one partition
> - 8 seconds to restore the state (650MB or so)
>
> With KIP 892, I verified that unclean shutdown does not cause a fresh
> restore (!!!!). I got the following benchmark results:
> - Benchmark took 216 seconds
> - 1,401 tasks per second on one partition
> - 11 seconds to restore the state
>
> I ran the restorations many times to ensure that there was no rounding
> error or noise; the results were remarkably consistent. Additionally, I ran
> the restorations with KIP-892 built with Speedb OSS. The restoration time
> consistently came out as 10 seconds, which was an improvement from the 11
> seconds observed with RocksDB + KIP-892.
>
> My application is bottlenecked mostly by serialization and deserialization,
> so improving the performance of the state store doesn't really impact our
> throughput that much. And the processing performance (benchmark time,
> tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at
> larger state store sizes, RocksDB performance begins to degrade, so that
> might not be true once we pass 20GB per partition.
>
> -- QUESTION: Because we observed a significant (30% or so) and reproducible
> slowdown during restoration, it seems like KIP-892 uses the checkpointing
> behavior during restoration as well? If so, I would argue that this might
> not be necessary, because everything we write is already committed, so we
> don't need to change the behavior during restoration or standby tasks.
> Perhaps we could write the offsets to RocksDB on every batch (or even every
> 5 seconds or so).
>
> -- Note: This was a very small-scale test, with <1GB of state (as I didn't
> have time to spend hours building up state). In the past I have noted that
> RocksDB performance degrades significantly after 25GB of state in one
> store. Future work involves determining the performance impact of KIP-892
> relative to trunk at larger scale, since it's possible that the relative
> behaviors are far different (i.e. relative to trunk, 892's processing and
> restoration throughput might be much better or much worse).
>
> -- Note: For those who want to replicate the tests, you can find the branch
> of our streams app here:
>
> https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks
> . The example I ran was `examples/hundred-tasks`, and I ran the server with
> `./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file has a
> detailed breakdown of the testing.
>
> Anyways, I'm super excited about this KIP and if a bit more future testing
> goes well, we plan to ship our product with a build of KIP-892, Speedb OSS,
> and potentially a few other minor tweaks that we are thinking about.
>
> Thanks Nick!
>
> Ride well,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Thu, Aug 24, 2023 at 3:19 AM Nick Telford <ni...@gmail.com>
> wrote:
>
> > Hi Bruno,
> >
> > Thanks for taking the time to review the KIP. I'm back from leave now and
> > intend to move this forwards as quickly as I can.
> >
> > Addressing your points:
> >
> > 1.
> > Because flush() is part of the StateStore API, it's exposed to custom
> > Processors, which might be making calls to flush(). This was actually the
> > case in a few integration tests.
> > To maintain as much compatibility as possible, I'd prefer not to make
> this
> > an UnsupportedOperationException, as it will cause previously working
> > Processors to start throwing exceptions at runtime.
> > I agree that it doesn't make sense for it to proxy commit(), though, as
> > that would cause it to violate the "StateStores commit only when the Task
> > commits" rule.
> > Instead, I think we should make this a no-op. That way, existing user
> > Processors will continue to work as-before, without violation of store
> > consistency that would be caused by premature flush/commit of StateStore
> > data to disk.
> > What do you think?
> >
> > 2.
> > As stated in the JavaDoc, when a StateStore implementation is
> > transactional, but is unable to estimate the uncommitted memory usage,
> the
> > method will return -1.
> > The intention here is to permit third-party implementations that may not
> be
> > able to estimate memory usage.
> >
> > Yes, it will be 0 when nothing has been written to the store yet. I
> thought
> > that was implied by "This method will return an approximation of the
> memory
> > would be freed by the next call to {@link #commit(Map)}" and "@return The
> > approximate size of all records awaiting {@link #commit(Map)}", however,
> I
> > can add it explicitly to the JavaDoc if you think this is unclear?
> >
> > 3.
> > I realise this is probably the most contentious point in my design, and
> I'm
> > open to changing it if I'm unable to convince you of the benefits.
> > Nevertheless, here's my argument:
> > The Interactive Query (IQ) API(s) are directly provided StateStores to
> > query, and it may be important for users to programmatically know which
> > mode the StateStore is operating under. If we simply provide an
> > "eosEnabled" boolean (as used throughout the internal streams engine), or
> > similar, then users will need to understand the operation and
> consequences
> > of each available processing mode and how it pertains to their
> StateStore.
> >
> > Interactive Query users aren't the only people that care about the
> > processing.mode/IsolationLevel of a StateStore: implementers of custom
> > StateStores also need to understand the behaviour expected of their
> > implementation. KIP-892 introduces some assumptions into the Streams
> Engine
> > about how StateStores operate under each processing mode, and it's
> > important that custom implementations adhere to those assumptions in
> order
> > to maintain the consistency guarantees.
> >
> > IsolationLevels provide a high-level contract on the behaviour of the
> > StateStore: a user knows that under READ_COMMITTED, they will see writes
> > only after the Task has committed, and under READ_UNCOMMITTED they will
> see
> > writes immediately. No understanding of the details of each
> processing.mode
> > is required, either for IQ users or StateStore implementers.
> >
> > An argument can be made that these contractual guarantees can simply be
> > documented for the processing.mode (i.e. that exactly-once and
> > exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves like
> > READ_UNCOMMITTED), but there are several small issues with this I'd
> prefer
> > to avoid:
> >
> >    - Where would we document these contracts, in a way that is difficult
> >    for users/implementers to miss/ignore?
> >    - It's not clear to users that the processing mode is communicating
> >    an expectation of read isolation, unless they read the documentation.
> > Users
> >    rarely consult documentation unless they feel they need to, so it's
> > likely
> >    this detail would get missed by many users.
> >    - It tightly couples processing modes to read isolation. Adding new
> >    processing modes, or changing the read isolation of existing
> processing
> >    modes would be difficult/impossible.
> >
> > Ultimately, the cost of introducing IsolationLevels is just a single
> > method, since we re-use the existing IsolationLevel enum from Kafka. This
> > gives us a clear place to document the contractual guarantees expected
> > of/provided by StateStores, that is accessible both by the StateStore
> > itself, and by IQ users.
> >
> > (Writing this I've just realised that the StateStore and IQ APIs actually
> > don't provide access to StateStoreContext that IQ users would have direct
> > access to... Perhaps StateStore should expose isolationLevel() itself
> too?)
> >
> > 4.
> > Yeah, I'm not comfortable renaming the metrics in-place either, as it's a
> > backwards incompatible change. My concern is that, if we leave the
> existing
> > "flush" metrics in place, they will be confusing to users. Right now,
> > "flush" metrics record explicit flushes to disk, but under KIP-892, even
> a
> > commit() will not explicitly flush data to disk - RocksDB will decide on
> > when to flush memtables to disk itself.
> >
> > If we keep the existing "flush" metrics, we'd have two options, which
> both
> > seem pretty bad to me:
> >
> >    1. Have them record calls to commit(), which would be misleading, as
> >    data is no longer explicitly "flushed" to disk by this call.
> >    2. Have them record nothing at all, which is equivalent to removing
> the
> >    metrics, except that users will see the metric still exists and so
> > assume
> >    that the metric is correct, and that there's a problem with their
> system
> >    when there isn't.
> >
> > I agree that removing them is also a bad solution, and I'd like some
> > guidance on the best path forward here.
> >
> > 5.
> > Position files are updated on every write to a StateStore. Since our
> writes
> > are now buffered until commit(), we can't update the Position file until
> > commit() has been called, otherwise it would be inconsistent with the
> data
> > in the event of a rollback. Consequently, we need to manage these offsets
> > the same way we manage the checkpoint offsets, and ensure they're only
> > written on commit().
> >
> > 6.
> > Agreed, although I'm not exactly sure yet what tests to write. How
> explicit
> > do we need to be here in the KIP?
> >
> > As for upgrade/downgrade: upgrade is designed to be seamless, and we
> should
> > definitely add some tests around that. Downgrade, it transpires, isn't
> > currently possible, as the extra column family for offset storage is
> > incompatible with the pre-KIP-892 implementation: when you open a RocksDB
> > database, you must open all available column families or receive an
> error.
> > What currently happens on downgrade is that it attempts to open the
> store,
> > throws an error about the offsets column family not being opened, which
> > triggers a wipe and rebuild of the Task. Given that downgrades should be
> > uncommon, I think this is acceptable behaviour, as the end-state is
> > consistent, even if it results in an undesirable state restore.
> >
> > Should I document the upgrade/downgrade behaviour explicitly in the KIP?
> >
> > --
> >
> > Regards,
> > Nick
> >
> >
> > On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <ca...@apache.org> wrote:
> >
> > > Hi Nick!
> > >
> > > Thanks for the updates!
> > >
> > > 1.
> > > Why does StateStore#flush() default to
> > > StateStore#commit(Collections.emptyMap())?
> > > Since calls to flush() will not exist anymore after this KIP is
> > > released, I would rather throw an unsupported operation exception by
> > > default.
> > >
> > >
> > > 2.
> > > When would a state store return -1 from
> > > StateStore#approximateNumUncommittedBytes() while being transactional?
> > >
> > > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
> > > the state store is transactional but nothing has been written to the
> > > state store yet?
> > >
> > >
> > > 3.
> > > Sorry for bringing this up again. Does this KIP really need to
> introduce
> > > StateStoreContext#isolationLevel()? StateStoreContext has already
> > > appConfigs() which basically exposes the same information, i.e., if EOS
> > > is enabled or not.
> > > In one of your previous e-mails you wrote:
> > >
> > > "My idea was to try to keep the StateStore interface as loosely coupled
> > > from the Streams engine as possible, to give implementers more freedom,
> > > and reduce the amount of internal knowledge required."
> > >
> > > While I understand the intent, I doubt that it decreases the coupling
> of
> > > a StateStore interface and the Streams engine. READ_COMMITTED only
> > > applies to IQ but not to reads by processors. Thus, implementers need
> to
> > > understand how Streams accesses the state stores.
> > >
> > > I would like to hear what others think about this.
> > >
> > >
> > > 4.
> > > Great exposing new metrics for transactional state stores! However, I
> > > would prefer to add new metrics and deprecate (in the docs) the old
> > > ones. You can find examples of deprecated metrics here:
> > > https://kafka.apache.org/documentation/#selector_monitoring
> > >
> > >
> > > 5.
> > > Why does the KIP mention position files? I do not think they are
> related
> > > to transactions or flushes.
> > >
> > >
> > > 6.
> > > I think we will also need to adapt/add integration tests besides unit
> > > tests. Additionally, we probably need integration or system tests to
> > > verify that upgrades and downgrades between transactional and
> > > non-transactional state stores work as expected.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > >
> > >
> > >
> > > On 7/21/23 10:34 PM, Nick Telford wrote:
> > > > One more thing: I noted John's suggestion in the KIP, under "Rejected
> > > > Alternatives". I still think it's an idea worth pursuing, but I
> believe
> > > > that it's out of the scope of this KIP, because it solves a different
> > set
> > > > of problems to this KIP, and the scope of this one has already grown
> > > quite
> > > > large!
> > > >
> > > > On Fri, 21 Jul 2023 at 21:33, Nick Telford <ni...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> I've updated the KIP (
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > )
> > > >> with the latest changes; mostly bringing back "Atomic Checkpointing"
> > > (for
> > > >> what feels like the 10th time!). I think the one thing missing is
> some
> > > >> changes to metrics (notably the store "flush" metrics will need to
> be
> > > >> renamed to "commit").
> > > >>
> > > >> The reason I brought back Atomic Checkpointing was to decouple store
> > > flush
> > > >> from store commit. This is important, because with Transactional
> > > >> StateStores, we now need to call "flush" on *every* Task commit, and
> > not
> > > >> just when the StateStore is closing, otherwise our transaction
> buffer
> > > will
> > > >> never be written and persisted, instead growing unbounded! I
> > > experimented
> > > >> with some simple solutions, like forcing a store flush whenever the
> > > >> transaction buffer was likely to exceed its configured size, but
> this
> > > was
> > > >> brittle: it prevented the transaction buffer from being configured
> to
> > be
> > > >> unbounded, and it still would have required explicit flushes of
> > RocksDB,
> > > >> yielding sub-optimal performance and memory utilization.
> > > >>
> > > >> I deemed Atomic Checkpointing to be the "right" way to resolve this
> > > >> problem. By ensuring that the changelog offsets that correspond to
> the
> > > most
> > > >> recently written records are always atomically written to the
> > StateStore
> > > >> (by writing them to the same transaction buffer), we can avoid
> > forcibly
> > > >> flushing the RocksDB memtables to disk, letting RocksDB flush them
> > only
> > > >> when necessary, without losing any of our consistency guarantees.
> See
> > > the
> > > >> updated KIP for more info.
> > > >>
> > > >> I have fully implemented these changes, although I'm still not
> > entirely
> > > >> happy with the implementation for segmented StateStores, so I plan
> to
> > > >> refactor that. Despite that, all tests pass. If you'd like to try
> out
> > or
> > > >> review this highly experimental and incomplete branch, it's
> available
> > > here:
> > > >> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's
> > > built
> > > >> against Kafka 3.5.0 so that I had a stable base to build and test it
> > on,
> > > >> and to enable easy apples-to-apples comparisons in a live
> > environment. I
> > > >> plan to rebase it against trunk once it's nearer completion and has
> > been
> > > >> proven on our main application.
> > > >>
> > > >> I would really appreciate help in reviewing and testing:
> > > >> - Segmented (Versioned, Session and Window) stores
> > > >> - Global stores
> > > >>
> > > >> As I do not currently use either of these, so my primary test
> > > environment
> > > >> doesn't test these areas.
> > > >>
> > > >> I'm going on Parental Leave starting next week for a few weeks, so
> > will
> > > >> not have time to move this forward until late August. That said,
> your
> > > >> feedback is welcome and appreciated, I just won't be able to respond
> > as
> > > >> quickly as usual.
> > > >>
> > > >> Regards,
> > > >> Nick
> > > >>
> > > >> On Mon, 3 Jul 2023 at 16:23, Nick Telford <ni...@gmail.com>
> > > wrote:
> > > >>
> > > >>> Hi Bruno
> > > >>>
> > > >>> Yes, that's correct, although the impact on IQ is not something I
> had
> > > >>> considered.
> > > >>>
> > > >>> What about atomically updating the state store from the transaction
> > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > flushing
> > > >>>> the memtable) every configured amount of data and/or number of
> > commit
> > > >>>> intervals?
> > > >>>>
> > > >>>
> > > >>> I'm not quite sure I follow. Are you suggesting that we add an
> > > additional
> > > >>> config for the max number of commit intervals between checkpoints?
> > That
> > > >>> way, we would checkpoint *either* when the transaction buffers are
> > > nearly
> > > >>> full, *OR* whenever a certain number of commit intervals have
> > elapsed,
> > > >>> whichever comes first?
> > > >>>
> > > >>> That certainly seems reasonable, although this re-ignites an
> earlier
> > > >>> debate about whether a config should be measured in "number of
> commit
> > > >>> intervals", instead of just an absolute time.
> > > >>>
> > > >>> FWIW, I realised that this issue is the reason I was pursuing the
> > > Atomic
> > > >>> Checkpoints, as it de-couples memtable flush from checkpointing,
> > which
> > > >>> enables us to just checkpoint on every commit without any
> performance
> > > >>> impact. Atomic Checkpointing is definitely the "best" solution, but
> > > I'm not
> > > >>> sure if this is enough to bring it back into this KIP.
> > > >>>
> > > >>> I'm currently working on moving all the transactional logic
> directly
> > > into
> > > >>> RocksDBStore itself, which does away with the
> > StateStore#newTransaction
> > > >>> method, and reduces the number of new classes introduced,
> > significantly
> > > >>> reducing the complexity. If it works, and the complexity is
> > drastically
> > > >>> reduced, I may try bringing back Atomic Checkpoints into this KIP.
> > > >>>
> > > >>> Regards,
> > > >>> Nick
> > > >>>
> > > >>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <ca...@apache.org>
> > wrote:
> > > >>>
> > > >>>> Hi Nick,
> > > >>>>
> > > >>>> Thanks for the insights! Very interesting!
> > > >>>>
> > > >>>> As far as I understand, you want to atomically update the state
> > store
> > > >>>> from the transaction buffer, flush the memtable of a state store
> and
> > > >>>> write the checkpoint not after the commit time elapsed but after
> the
> > > >>>> transaction buffer reached a size that would lead to exceeding
> > > >>>> statestore.transaction.buffer.max.bytes before the next commit
> > > interval
> > > >>>> ends.
> > > >>>> That means, the Kafka transaction would commit every commit
> interval
> > > but
> > > >>>> the state store will only be atomically updated roughly every
> > > >>>> statestore.transaction.buffer.max.bytes of data. Also IQ would
> then
> > > only
> > > >>>> see new data roughly every
> statestore.transaction.buffer.max.bytes.
> > > >>>> After a failure the state store needs to restore up to
> > > >>>> statestore.transaction.buffer.max.bytes.
> > > >>>>
> > > >>>> Is this correct?
> > > >>>>
> > > >>>> What about atomically updating the state store from the
> transaction
> > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > flushing
> > > >>>> the memtable) every configured amount of data and/or number of
> > commit
> > > >>>> intervals? In such a way, we would have the same delay for records
> > > >>>> appearing in output topics and IQ because both would appear when
> the
> > > >>>> Kafka transaction is committed. However, after a failure the state
> > > store
> > > >>>> still needs to restore up to
> statestore.transaction.buffer.max.bytes
> > > and
> > > >>>> it might restore data that is already in the state store because
> the
> > > >>>> checkpoint lags behind the last stable offset (i.e. the last
> > committed
> > > >>>> offset) of the changelog topics. Restoring data that is already in
> > the
> > > >>>> state store is idempotent, so eos should not violated.
> > > >>>> This solution needs at least one new config to specify when a
> > > checkpoint
> > > >>>> should be written.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> A small correction to your previous e-mail that does not change
> > > anything
> > > >>>> you said: Under alos the default commit interval is 30 seconds,
> not
> > > five
> > > >>>> seconds.
> > > >>>>
> > > >>>>
> > > >>>> Best,
> > > >>>> Bruno
> > > >>>>
> > > >>>>
> > > >>>> On 01.07.23 12:37, Nick Telford wrote:
> > > >>>>> Hi everyone,
> > > >>>>>
> > > >>>>> I've begun performance testing my branch on our staging
> > environment,
> > > >>>>> putting it through its paces in our non-trivial application. I'm
> > > >>>> already
> > > >>>>> observing the same increased flush rate that we saw the last time
> > we
> > > >>>>> attempted to use a version of this KIP, but this time, I think I
> > know
> > > >>>> why.
> > > >>>>>
> > > >>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end of
> > the
> > > >>>> Task
> > > >>>>> commit process, has the following behaviour:
> > > >>>>>
> > > >>>>>      - Under ALOS: checkpoint the state stores. This includes
> > > >>>>>      flushing memtables in RocksDB. This is acceptable because
> the
> > > >>>> default
> > > >>>>>      commit.interval.ms is 5 seconds, so forcibly flushing
> > memtables
> > > >>>> every 5
> > > >>>>>      seconds is acceptable for most applications.
> > > >>>>>      - Under EOS: checkpointing is not done, *unless* it's being
> > > >>>> forced, due
> > > >>>>>      to e.g. the Task closing or being revoked. This means that
> > under
> > > >>>> normal
> > > >>>>>      processing conditions, the state stores will not be
> > > checkpointed,
> > > >>>> and will
> > > >>>>>      not have memtables flushed at all , unless RocksDB decides
> to
> > > >>>> flush them on
> > > >>>>>      its own. Checkpointing stores and force-flushing their
> > memtables
> > > >>>> is only
> > > >>>>>      done when a Task is being closed.
> > > >>>>>
> > > >>>>> Under EOS, KIP-892 needs to checkpoint stores on at least *some*
> > > normal
> > > >>>>> Task commits, in order to write the RocksDB transaction buffers
> to
> > > the
> > > >>>>> state stores, and to ensure the offsets are synced to disk to
> > prevent
> > > >>>>> restores from getting out of hand. Consequently, my current
> > > >>>> implementation
> > > >>>>> calls maybeCheckpoint on *every* Task commit, which is far too
> > > >>>> frequent.
> > > >>>>> This causes checkpoints every 10,000 records, which is a change
> in
> > > >>>> flush
> > > >>>>> behaviour, potentially causing performance problems for some
> > > >>>> applications.
> > > >>>>>
> > > >>>>> I'm looking into possible solutions, and I'm currently leaning
> > > towards
> > > >>>>> using the statestore.transaction.buffer.max.bytes configuration
> to
> > > >>>>> checkpoint Tasks once we are likely to exceed it. This would
> > > >>>> complement the
> > > >>>>> existing "early Task commit" functionality that this
> configuration
> > > >>>>> provides, in the following way:
> > > >>>>>
> > > >>>>>      - Currently, we use statestore.transaction.buffer.max.bytes
> to
> > > >>>> force an
> > > >>>>>      early Task commit if processing more records would cause our
> > > state
> > > >>>> store
> > > >>>>>      transactions to exceed the memory assigned to them.
> > > >>>>>      - New functionality: when a Task *does* commit, we will not
> > > >>>> checkpoint
> > > >>>>>      the stores (and hence flush the transaction buffers) unless
> we
> > > >>>> expect to
> > > >>>>>      cross the statestore.transaction.buffer.max.bytes threshold
> > > before
> > > >>>> the next
> > > >>>>>      commit
> > > >>>>>
> > > >>>>> I'm also open to suggestions.
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Nick
> > > >>>>>
> > > >>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> nick.telford@gmail.com
> > >
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Hi Bruno!
> > > >>>>>>
> > > >>>>>> 3.
> > > >>>>>> By "less predictable for users", I meant in terms of
> understanding
> > > the
> > > >>>>>> performance profile under various circumstances. The more
> complex
> > > the
> > > >>>>>> solution, the more difficult it would be for users to understand
> > the
> > > >>>>>> performance they see. For example, spilling records to disk when
> > the
> > > >>>>>> transaction buffer reaches a threshold would, I expect, reduce
> > write
> > > >>>>>> throughput. This reduction in write throughput could be
> > unexpected,
> > > >>>> and
> > > >>>>>> potentially difficult to diagnose/understand for users.
> > > >>>>>> At the moment, I think the "early commit" concept is relatively
> > > >>>>>> straightforward; it's easy to document, and conceptually fairly
> > > >>>> obvious to
> > > >>>>>> users. We could probably add a metric to make it easier to
> > > understand
> > > >>>> when
> > > >>>>>> it happens though.
> > > >>>>>>
> > > >>>>>> 3. (the second one)
> > > >>>>>> The IsolationLevel is *essentially* an indirect way of telling
> > > >>>> StateStores
> > > >>>>>> whether they should be transactional. READ_COMMITTED essentially
> > > >>>> requires
> > > >>>>>> transactions, because it dictates that two threads calling
> > > >>>>>> `newTransaction()` should not see writes from the other
> > transaction
> > > >>>> until
> > > >>>>>> they have been committed. With READ_UNCOMMITTED, all bets are
> off,
> > > and
> > > >>>>>> stores can allow threads to observe written records at any time,
> > > >>>> which is
> > > >>>>>> essentially "no transactions". That said, StateStores are free
> to
> > > >>>> implement
> > > >>>>>> these guarantees however they can, which is a bit more relaxed
> > than
> > > >>>>>> dictating "you must use transactions". For example, with RocksDB
> > we
> > > >>>> would
> > > >>>>>> implement these as READ_COMMITTED == WBWI-based "transactions",
> > > >>>>>> READ_UNCOMMITTED == direct writes to the database. But with
> other
> > > >>>> storage
> > > >>>>>> engines, it might be preferable to *always* use transactions,
> even
> > > >>>> when
> > > >>>>>> unnecessary; or there may be storage engines that don't provide
> > > >>>>>> transactions, but the isolation guarantees can be met using a
> > > >>>> different
> > > >>>>>> technique.
> > > >>>>>> My idea was to try to keep the StateStore interface as loosely
> > > coupled
> > > >>>>>> from the Streams engine as possible, to give implementers more
> > > >>>> freedom, and
> > > >>>>>> reduce the amount of internal knowledge required.
> > > >>>>>> That said, I understand that "IsolationLevel" might not be the
> > right
> > > >>>>>> abstraction, and we can always make it much more explicit if
> > > >>>> required, e.g.
> > > >>>>>> boolean transactional()
> > > >>>>>>
> > > >>>>>> 7-8.
> > > >>>>>> I can make these changes either later today or tomorrow.
> > > >>>>>>
> > > >>>>>> Small update:
> > > >>>>>> I've rebased my branch on trunk and fixed a bunch of issues that
> > > >>>> needed
> > > >>>>>> addressing. Currently, all the tests pass, which is promising,
> but
> > > it
> > > >>>> will
> > > >>>>>> need to undergo some performance testing. I haven't (yet) worked
> > on
> > > >>>>>> removing the `newTransaction()` stuff, but I would expect that,
> > > >>>>>> behaviourally, it should make no difference. The branch is
> > available
> > > >>>> at
> > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone
> is
> > > >>>>>> interested in taking an early look.
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Nick
> > > >>>>>>
> > > >>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cadonna@apache.org
> >
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Nick,
> > > >>>>>>>
> > > >>>>>>> 1.
> > > >>>>>>> Yeah, I agree with you. That was actually also my point. I
> > > understood
> > > >>>>>>> that John was proposing the ingestion path as a way to avoid
> the
> > > >>>> early
> > > >>>>>>> commits. Probably, I misinterpreted the intent.
> > > >>>>>>>
> > > >>>>>>> 2.
> > > >>>>>>> I agree with John here, that actually it is public API. My
> > question
> > > >>>> is
> > > >>>>>>> how this usage pattern affects normal processing.
> > > >>>>>>>
> > > >>>>>>> 3.
> > > >>>>>>> My concern is that checking for the size of the transaction
> > buffer
> > > >>>> and
> > > >>>>>>> maybe triggering an early commit affects the whole processing
> of
> > > >>>> Kafka
> > > >>>>>>> Streams. The transactionality of a state store is not confined
> to
> > > the
> > > >>>>>>> state store itself, but spills over and changes the behavior of
> > > other
> > > >>>>>>> parts of the system. I agree with you that it is a decent
> > > >>>> compromise. I
> > > >>>>>>> just wanted to analyse the downsides and list the options to
> > > overcome
> > > >>>>>>> them. I also agree with you that all options seem quite heavy
> > > >>>> compared
> > > >>>>>>> with your KIP. I do not understand what you mean with "less
> > > >>>> predictable
> > > >>>>>>> for users", though.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I found the discussions about the alternatives really
> > interesting.
> > > >>>> But I
> > > >>>>>>> also think that your plan sounds good and we should continue
> with
> > > it!
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Some comments on your reply to my e-mail on June 20th:
> > > >>>>>>>
> > > >>>>>>> 3.
> > > >>>>>>> Ah, now, I understand the reasoning behind putting isolation
> > level
> > > in
> > > >>>>>>> the state store context. Thanks! Should that also be a way to
> > give
> > > >>>> the
> > > >>>>>>> the state store the opportunity to decide whether to turn on
> > > >>>>>>> transactions or not?
> > > >>>>>>> With my comment, I was more concerned about how do you know if
> a
> > > >>>>>>> checkpoint file needs to be written under EOS, if you do not
> > have a
> > > >>>> way
> > > >>>>>>> to know if the state store is transactional or not. If a state
> > > store
> > > >>>> is
> > > >>>>>>> transactional, the checkpoint file can be written during normal
> > > >>>>>>> processing under EOS. If the state store is not transactional,
> > the
> > > >>>>>>> checkpoint file must not be written under EOS.
> > > >>>>>>>
> > > >>>>>>> 7.
> > > >>>>>>> My point was about not only considering the bytes in memory in
> > > config
> > > >>>>>>> statestore.uncommitted.max.bytes, but also bytes that might be
> > > >>>> spilled
> > > >>>>>>> on disk. Basically, I was wondering whether you should remove
> the
> > > >>>>>>> "memory" in "Maximum number of memory bytes to be used to
> > > >>>>>>> buffer uncommitted state-store records." My thinking was that
> > even
> > > >>>> if a
> > > >>>>>>> state store spills uncommitted bytes to disk, limiting the
> > overall
> > > >>>> bytes
> > > >>>>>>> might make sense. Thinking about it again and considering the
> > > recent
> > > >>>>>>> discussions, it does not make too much sense anymore.
> > > >>>>>>> I like the name statestore.transaction.buffer.max.bytes that
> you
> > > >>>> proposed.
> > > >>>>>>>
> > > >>>>>>> 8.
> > > >>>>>>> A high-level description (without implementation details) of
> how
> > > >>>> Kafka
> > > >>>>>>> Streams will manage the commit of changelog transactions, state
> > > store
> > > >>>>>>> transactions and checkpointing would be great. Would be great
> if
> > > you
> > > >>>>>>> could also add some sentences about the behavior in case of a
> > > >>>> failure.
> > > >>>>>>> For instance how does a transactional state store recover
> after a
> > > >>>>>>> failure or what happens with the transaction buffer, etc. (that
> > is
> > > >>>> what
> > > >>>>>>> I meant by "fail-over" in point 9.)
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Bruno
> > > >>>>>>>
> > > >>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > > >>>>>>>> Hi Bruno,
> > > >>>>>>>>
> > > >>>>>>>> 1.
> > > >>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex
> > > >>>> transactions
> > > >>>>>>>> have, whereby exceeding (or likely to exceed) configured
> memory
> > > >>>> needs to
> > > >>>>>>>> trigger an early commit?
> > > >>>>>>>>
> > > >>>>>>>> 2.
> > > >>>>>>>> This is one of my big concerns. Ultimately, any approach based
> > on
> > > >>>>>>> cracking
> > > >>>>>>>> open RocksDB internals and using it in ways it's not really
> > > designed
> > > >>>>>>> for is
> > > >>>>>>>> likely to have some unforseen performance or consistency
> issues.
> > > >>>>>>>>
> > > >>>>>>>> 3.
> > > >>>>>>>> What's your motivation for removing these early commits? While
> > not
> > > >>>>>>> ideal, I
> > > >>>>>>>> think they're a decent compromise to ensure consistency whilst
> > > >>>>>>> maintaining
> > > >>>>>>>> good and predictable performance.
> > > >>>>>>>> All 3 of your suggested ideas seem *very* complicated, and
> might
> > > >>>>>>> actually
> > > >>>>>>>> make behaviour less predictable for users as a consequence.
> > > >>>>>>>>
> > > >>>>>>>> I'm a bit concerned that the scope of this KIP is growing a
> bit
> > > out
> > > >>>> of
> > > >>>>>>>> control. While it's good to discuss ideas for future
> > > improvements, I
> > > >>>>>>> think
> > > >>>>>>>> it's important to narrow the scope down to a design that
> > achieves
> > > >>>> the
> > > >>>>>>> most
> > > >>>>>>>> pressing objectives (constant sized restorations during dirty
> > > >>>>>>>> close/unexpected errors). Any design that this KIP produces
> can
> > > >>>>>>> ultimately
> > > >>>>>>>> be changed in the future, especially if the bulk of it is
> > internal
> > > >>>>>>>> behaviour.
> > > >>>>>>>>
> > > >>>>>>>> I'm going to spend some time next week trying to re-work the
> > > >>>> original
> > > >>>>>>>> WriteBatchWithIndex design to remove the newTransaction()
> > method,
> > > >>>> such
> > > >>>>>>> that
> > > >>>>>>>> it's just an implementation detail of RocksDBStore. That way,
> if
> > > we
> > > >>>>>>> want to
> > > >>>>>>>> replace WBWI with something in the future, like the SST file
> > > >>>> management
> > > >>>>>>>> outlined by John, then we can do so with little/no API
> changes.
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>>
> > > >>>>>>>> Nick
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >
> > >
> >
>