You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2017/02/28 21:54:43 UTC

[DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Hi all,

I have just created KIP-129 to leverage KIP-98 in Kafka Streams and provide
exactly-once processing semantics:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

This KIP enables Streams users to optionally turn on exactly-once
processing semantics without changing their app code at all by leveraging
the transactional messaging features provided in KIP-98.

The above wiki page provides a high-level view of the proposed changes,
while detailed implementation design can be found in this Google doc:

https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c

We would love to hear your comments and suggestions.

Thanks,
-- Guozhang

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Michael,

I don't think we "hand waved" this question, but as you mentioned
anyway, without an implementation it's hard (or even impossible) to
quantify the probability. Nevertheless, I'll try:

First, I want to refer to the "Error Handling" section in the KIP:
https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.pfjv95485rsb

It explains that we distinguish 3 failure types:

 (1) For the first type "Rollback and Resume", a store recovery would be
required -- that is a difference to non-EoS. (Will pick this up later on)

 (2) The second type of failure is a "new" EoS failure, that can for
example happen after a GC pause. However, as a task was "moved away"
from the thread hitting this error, no state recovery is required, thus,
we can ignore it for this discussion.

 (3) The last "stop the world" category can be ignored, too. We stop the
world and don't replay any store.


Thus, we only need to discuss error of category (1):

For non-EoS, as we guarantee at-least-once processing, the state store
is just used as-is on a retry. This is ok, as it will only result into
processing a record multiple times, ie, a duplicate update to the store.
For EoS, we need to get a "clean" store on retry to guarantee
exactly-once. For vanilla EoS we wipe out the local store and recreate
it from the changelog (what is expensive).

However, this type of errors are transient broker/network failures. We
can assume, that brokers do fail rarely. Furthermore, we set Producer
retry strategy to infinite (cf.
https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#bookmark=id.45qx8zol0y6r).
This makes such error (on Streams side) even more rare than for non-EoS,
as the Producer mask those error automatically.

There would still be some errors, that the Producer cannot retry, but
Streams can (by aborting a transaction or maybe by recreating a new
Producer for a task). For those "Streams reties" we need the expensive
store recreation. Examples would be InvalidSequenceNumber exception or a
TimeoutException. (cf. KIP-98:
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.5x3010zgtp7o)

Those exception would indicate potential data loss and should only occur
if there is a miss configuration or a bug.

Therefore, I am not too worried about the "worst case outcome" that
Streams-EoS 1.0 would not work in practice.


I hope this arguments mitigate your (basically valid) concerns.



-Matthias



On 3/24/17 9:01 AM, Michael Noll wrote:
> Ah, I think the use of the word "regression" was misleading.
> 
> Let me try to rephrase:  At the moment it is unclear to me whether the
> scenario where we need to rebuild a state store from scratch will be as
> likely, a little bit more likely, much more likely, or very, very much more
> likely to happen than pre-EOS.  So far, I had the impression this question
> was hand-waved a bit.
> 
> Yes, you can enable/disable EOS for Streams, and by default it is disabled,
> so we shouldn't talk about a "regression" here (agreed).  But, if we don't
> understand 1. under which conditions we'd trigger a full rebuild of state
> stores and 2. how likely these conditions are, then I am concerned that we
> could (worst case) end up with a V1 implementation that turns out unusable
> for many users.  And even if we label the new functionality as
> "experimental" (which IIRC is the case), this could still be perceived by
> users in a negative way.  (Just think about how long Mongo has been having
> the stigma of "it loses my data".)
> 
> What I'd suggest us to do is to at least address my point (1) above --
> explaining under which conditions we'd retrigger a full rebuild of state
> stores, what would lead to failed producer transactions, etc.  This should
> be a rather quick exercise, given the offline discussion we had earlier
> this week.  I agree that (2) -- the likelihood of these things to happen --
> is much harder to quantify, particularly at a point where we don't even
> have a fully working implementation (:-P), so this should perhaps wait
> until later.  That said, having a reasonably good understanding of (1)
> should also help us to review the current Streams EOS design, help with
> testing/benchmarking, and also help to possibly improve the V1
> implementation if need be.
> 
> Does that make sense?
> Michael
> 
> 
> 
> 
> 
> 
> On Wed, Mar 22, 2017 at 5:07 PM, Sriram Subramanian <ra...@confluent.io>
> wrote:
> 
>> This is a completely new feature which is controlled by a config. It would
>> be a regression if you upgraded streams and saw a different behavior. That
>> would not happen in this case. This is similar to how we are introducing
>> idempotent producer in core kafka with a config to turn it on. There is no
>> guarantee that the performance of the producer with the config turned on
>> will be the same although eventually we will like to get to it.
>>
>> On Wed, Mar 22, 2017 at 7:12 AM, Michael Noll <mi...@confluent.io>
>> wrote:
>>
>>> I second Eno's concern regarding the impact of Streams EOS on state
>> stores.
>>>
>>>>  We do a full recovery today and the EOS proposal will not make this
>> any
>>> worse.
>>>
>>> Yes, today we do a full state store recovery under certain failures.
>>> However, I think the point (or perhaps: open question) is that, with the
>>> EOS design, there's now an *increased likelihood* of such failures that
>>> trigger full state store recovery.  If this increase is significant,
>> then I
>>> would consider this to be a regression that we should address.
>>>
>>> As Eno said:
>>>
>>>> currently we pay the recovery price for a Kafka Streams instance
>> failure.
>>>> Now we might pay it for a transaction failure. Will transaction
>> failures
>>> be
>>>> more or less common than the previous types of failures?
>>>
>>> Damian voiced similar concerns at the very beginning of this discussion,
>>> not sure what his current opinion is here.
>>>
>>> -Michael
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <ra...@confluent.io>
>>> wrote:
>>>
>>>> To add to this discussion, I do think we should think about this in
>>>> increments. We do a full recovery today and the EOS proposal will not
>>> make
>>>> this any worse. Using store snapshot is a good option to avoid store
>>>> recovery in the future but as Eno points out, all pluggable stores
>> would
>>>> need to have this ability. W.r.t transaction failures, this should not
>> be
>>>> an issue. We should be simply retrying. There is one optimization we
>> can
>>> do
>>>> for clean shutdowns. We could store a clean shutdown file that contains
>>> the
>>>> input offsets. This file gets written when you close the streams
>>> instance.
>>>> On start, you could can check the offsets from the shutdown file and
>>>> compare it with the offsets we get from the consumer and ensure they
>>> match.
>>>> If they do, you could use the same store instead of recovering.
>> However,
>>> if
>>>> we go with the snapshot approach, this will not be required. My vote
>>> would
>>>> be to implement V1 and solve the bootstrap problem which exist today in
>>> the
>>>> future versions.
>>>>
>>>> On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <
>> matthias@confluent.io>
>>>> wrote:
>>>>
>>>>> Thanks for your feedback Eno.
>>>>>
>>>>> For now, I still think that the KIP itself does not need to talk
>> about
>>>>> this in more detail, because we apply the same strategy for EoS as
>> for
>>>>> non-EoS (as of 0.10.2).
>>>>>
>>>>> Thus, in case of a clean shutdown, we write the checkpoint file for a
>>>>> store and thus know we can reuse the store. In case of failure, we
>> need
>>>>> to recreate the store from the changelog.
>>>>>
>>>>>> Will a V1 design that relies on plain store recovery from Kafka for
>>>>>> each transaction abort be good enough, or usable?
>>>>>
>>>>> Why should it not be usable? It's the same strategy as used in 0.10.2
>>>>> and it runs in production in many companies already.
>>>>>
>>>>>> however it seems to me we might have a regression of sorts
>>>>>> Now we might pay it for a transaction failure.
>>>>>
>>>>> I would assume transaction failures to be quite rare. Maybe the core
>>> EoS
>>>>> folks can comment here, too.
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 3/20/17 3:16 PM, Eno Thereska wrote:
>>>>>> Hi Matthias,
>>>>>>
>>>>>> I'd like to see some more info on how you propose to handle
>>>> transactions
>>>>> that involve state stores in the KIP itself. The design doc has info
>>>> about
>>>>> various optimisations like RocksDb snapshots and transactions and
>> such,
>>>> but
>>>>> will there be a user-visible interface that indicates whether a store
>>> has
>>>>> snapshot and/or transactional capabilities? If a user plugs in
>> another
>>>>> store, what guarantees are they expected to get?
>>>>>>
>>>>>> Will a V1 design that relies on plain store recovery from Kafka for
>>>> each
>>>>> transaction abort be good enough, or usable? If your dataset is large
>>>>> (e.g., 200GB) the recovery time might be so large as to effectively
>>>> render
>>>>> that Kafka Streams instance unavailable for tens of minutes. You
>>> mention
>>>>> that is not a regression to what we currently have, however it seems
>> to
>>>> me
>>>>> we might have a regression of sorts: currently we pay the recovery
>>> price
>>>>> for a Kafka Streams instance failure. Now we might pay it for a
>>>> transaction
>>>>> failure. Will transaction failures be more or less common than the
>>>> previous
>>>>> types of failures? I'd like to see this addressed.
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>
>>>>>>
>>>>>>> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
>>>>> wrote:
>>>>>>>
>>>>>>> Just a quick follow up:
>>>>>>>
>>>>>>> Our overall proposal is, to implement KIP-129 as is as a “Stream
>> EoS
>>>>>>> 1.0” version. The raised concerns are all valid, but hard to
>>> quantify
>>>> at
>>>>>>> the moment. Implementing KIP-129, that provides a clean design,
>>> allows
>>>>>>> us to gain more insight in the performance implications. This
>>> enables
>>>>>>> us, to make an educated decision, if the “producer per task” model
>>>>>>> perform wells or not, and if a switch to a “producer per thread”
>>> model
>>>>>>> is mandatory.
>>>>>>>
>>>>>>> We also want to point out, that we can move incrementally from
>>>> "producer
>>>>>>> per task" to "producer per thread" design or apply some
>> incremental
>>>>>>> improvements to "producer per task" (as discussed in the doc).
>> Thus,
>>>>>>> there is not issue with regard to upgrading.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I want to pick up this thread again. As there are some concerns
>>> about
>>>>>>>> the "producer per task" design, we did write up an alternative
>>>>> "producer
>>>>>>>> per thread" design and discuss pros/cons of both approaches:
>>>>>>>>
>>>>>>>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
>>>>> zXISV4oE0ZeMZwT_sG1QWgL4EE
>>>>>>>>
>>>>>>>>
>>>>>>>> Looking forward to your feedback.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/10/17 3:24 AM, Damian Guy wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> Thanks for the response. I agree with you regarding the use of
>>>>>>>>> PartitionGrouper to reduce the number of tasks. It would be good
>>> to
>>>>> have an
>>>>>>>>> idea of any additional load on the brokers as we increase the
>>> number
>>>>> of
>>>>>>>>> tasks and therefore producers.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <
>>> matthias@confluent.io>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Damian, Jun,
>>>>>>>>>>
>>>>>>>>>> Thanks for your input.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> About Performance test:
>>>>>>>>>>
>>>>>>>>>> I can follow up with more performance tests using more
>> partitions
>>>> and
>>>>>>>>>> also collecting broker metrics.
>>>>>>>>>>
>>>>>>>>>> However, I want to highlight again, that even if 1000+
>> partitions
>>>>> would
>>>>>>>>>> be problematic, one can simply implement PartitionGrouper
>>> interface
>>>>> and
>>>>>>>>>> reduce the number of tasks to 250 or 100... So I am not sure,
>> if
>>> we
>>>>>>>>>> should block this KIP, even if there might be some performance
>>>>> penalty
>>>>>>>>>> for currently single partitioned tasks.
>>>>>>>>>>
>>>>>>>>>> About memory usage. JXM max-heap and max-off-heap did report
>>> 256MB
>>>>> and
>>>>>>>>>> 133MB for all experiments (thus I did not put it in the
>>>> spreadsheet).
>>>>>>>>>> Thus, using 100 producers (each using a max of 32MB of memory)
>>> was
>>>>> not
>>>>>>>>>> an issue with regard to memory consumption. I did not track
>>>> "current
>>>>>>>>>> head/off-heap" memory as this would require a more advance test
>>>>> setup to
>>>>>>>>>> monitor it over time. If you think this would be required, we
>> can
>>>> do
>>>>>>>>>> some tests though.
>>>>>>>>>>
>>>>>>>>>> However, as 256 MB was enough memory, and there are other
>>>> components
>>>>>>>>>> next to the producers using memory, I don't expect a severely
>>>>> increased
>>>>>>>>>> memory usage. Producer allocate memory on-demand, and if load
>> is
>>>>> shared
>>>>>>>>>> over multiple producers, overall memory usage should stay the
>>> same
>>>>> as a
>>>>>>>>>> single producer should allocate less memory.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> About Batching:
>>>>>>>>>>
>>>>>>>>>> As you can see from the benchmarks (in the detailed view -- I
>>> also
>>>>> added
>>>>>>>>>> some graphs to the summary now) the average batch size gets
>>>> slightly
>>>>>>>>>> decrease with an increased number of partitions. However, there
>>> is
>>>> no
>>>>>>>>>> big difference between "producer per thread" and "producer per
>>>> task"
>>>>>>>>>> scenario.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> About acks:
>>>>>>>>>>
>>>>>>>>>> This is covered by KIP-98 already. If idempotent producer is
>> use,
>>>>> it's
>>>>>>>>>> required to set max.in.flight.requests.per.connection=1 and
>>>> retries
>>>>>> 0
>>>>>>>>>> -- otherwise a config exception will be thrown. For
>> transactions,
>>>>> it's
>>>>>>>>>> further required that acks=-1 to avoid a config exception.
>>>>>>>>>>
>>>>>>>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
>>>>> broker/topic
>>>>>>>>>> configs) are out of scope, and it's user responsibility to set
>>>> those
>>>>>>>>>> values correctly to ensure transactionality and idempotency.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>>>>>>>>>> Hi, Guozhang,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP. A couple of comments.
>>>>>>>>>>>
>>>>>>>>>>> 1. About the impact on producer batching. My understanding is
>>> that
>>>>>>>>>>> typically different sub-topologies in the same task are
>>> publishing
>>>>> to
>>>>>>>>>>> different topics. Since the producer batching happens at the
>>>>>>>>>>> topic/partition level, using a producer per task may not
>> impact
>>>>> batching
>>>>>>>>>>> much.
>>>>>>>>>>>
>>>>>>>>>>> 2. When processing.guarantee is set to exactly_once, do we
>> want
>>> to
>>>>>>>>>> enforce
>>>>>>>>>>> acks to all in the producer? The default acks is 1 and may
>> cause
>>>>> acked
>>>>>>>>>> data
>>>>>>>>>>> to be lost later when the leader changes.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Jun
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <
>>> damian.guy@gmail.com>
>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks. The perf test is a good start but I don't think it
>> goes
>>>> far
>>>>>>>>>> enough.
>>>>>>>>>>>> 100 partitions is not a lot. What happens when there are
>>>> thousands
>>>>> of
>>>>>>>>>>>> partitions? What is the load on the brokers? How much more
>>> memory
>>>>> is
>>>>>>>>>> used
>>>>>>>>>>>> by the Streams App etc?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Damian
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <
>>>> matthias@confluent.io
>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I want to give a first respond:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. Producer per task:
>>>>>>>>>>>>>
>>>>>>>>>>>>> First, we did some performance tests, indicating that the
>>>>> performance
>>>>>>>>>>>>> penalty is small. Please have a look here:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>>>>>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the test, we ran with a trunk version and a modified
>>> version
>>>>> that
>>>>>>>>>>>>> uses a producer per task (of course, no transactions, but
>>>>> at-least-once
>>>>>>>>>>>>> semantics). The scaling factor indicates the number of
>> brokers
>>>> and
>>>>>>>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark
>>>> that
>>>>> is
>>>>>>>>>>>>> part of AK code base.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Second, as the design is "producer per task" (and not
>>> "producer
>>>>> per
>>>>>>>>>>>>> partition") it is possible to specify a custom
>>> PartitionGrouper
>>>>> that
>>>>>>>>>>>>> assigns multiple partitions to a single task. Thus, it
>> allows
>>> to
>>>>> reduce
>>>>>>>>>>>>> the number of tasks for scenarios with many partitions.
>> Right
>>>>> now, this
>>>>>>>>>>>>> interface must be implemented solely by the user, but we
>> could
>>>>> also add
>>>>>>>>>>>>> a new config parameter that specifies the
>> max.number.of.tasks
>>> or
>>>>>>>>>>>>> partitions.per.task so that the user can configure this
>>> instead
>>>> of
>>>>>>>>>>>>> implementing the interface.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Third, there is the idea of a "Producer Pool" that would
>> allow
>>>> to
>>>>> share
>>>>>>>>>>>>> resources (network connections, memory, etc) over multiple
>>>>> producers.
>>>>>>>>>>>>> This would allow to separate multiple transaction on the
>>>> producer
>>>>>>>>>> level,
>>>>>>>>>>>>> while resources are shared. There is no detailed design
>>> document
>>>>> yet
>>>>>>>>>> and
>>>>>>>>>>>>> there would be a KIP for this feature.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thus, if there should be any performance problems for high
>>> scale
>>>>>>>>>>>>> scenarios, there are multiple ways to tackle them while
>>> keeping
>>>>> the
>>>>>>>>>>>>> "producer per task" design.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Additionally, a "producer per thread" design would be way
>> more
>>>>> complex
>>>>>>>>>>>>> and I summarized the issues in a separate document. I will
>>> share
>>>>> a link
>>>>>>>>>>>>> to the document soon.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. StateStore recovery:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Streams EoS will in the first design not allow to exploit
>> the
>>>>>>>>>>>>> improvements that are added for 0.11 at the moment. However,
>>> as
>>>>> 0.10.2
>>>>>>>>>>>>> faces the same issues of potentially long recovery, there is
>>> no
>>>>>>>>>>>>> regression with this regard. Thus, I see those improvements
>> as
>>>>>>>>>>>>> orthogonal or add-ons. Nevertheless, we should try to
>> explore
>>>>> those
>>>>>>>>>>>>> options and if possible get them into 0.11 such that Streams
>>>> with
>>>>> EoS
>>>>>>>>>>>>> gets the same improvements as at-least-once scenario.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. Caching:
>>>>>>>>>>>>>
>>>>>>>>>>>>> We might need to do some experiments to quantify the impact
>> on
>>>>> caching.
>>>>>>>>>>>>> If it's severe, the suggested default commit interval of
>> 100ms
>>>>> could
>>>>>>>>>>>>> also be increased. Also, EoS will not enforce any commit
>>>>> interval, but
>>>>>>>>>>>>> only change the default value. Thus, a user can freely
>>> trade-off
>>>>>>>>>> latency
>>>>>>>>>>>>> vs. caching-effect.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Last but not least, there is the idea to allow
>>>> "read_uncommitted"
>>>>> for
>>>>>>>>>>>>> intermediate topic. This would be an advance design for
>>> Streams
>>>>> EoS
>>>>>>>>>> that
>>>>>>>>>>>>> allows downstream sub-topologies to read uncommitted data
>>>>>>>>>>>>> optimistically. In case of failure, a cascading abort of
>>>>> transactions
>>>>>>>>>>>>> would be required. This change will need another KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. Idempotent Producer:
>>>>>>>>>>>>>
>>>>>>>>>>>>> The transactional part automatically leverages the
>> idempotent
>>>>>>>>>> properties
>>>>>>>>>>>>> of the producer. Idempotency is a requirement:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note that enable.idempotence must be enabled if a
>>>>> TransactionalId is
>>>>>>>>>>>>> configured.
>>>>>>>>>>>>>
>>>>>>>>>>>>> See
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>> https://docs.google.com/document/d/11Jqy_
>>>>> GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>>>>>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>>>>>>>>>
>>>>>>>>>>>>> All idempotent retries, are handled by the producer
>> internally
>>>>> (with or
>>>>>>>>>>>>> without transaction) if enable.idempotence is set to true.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>>>>>>>>>> Another question:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The KIP doesn’t exactly spell out how it uses the
>> idempotence
>>>>>>>>>> guarantee
>>>>>>>>>>>>> from KIP-98. It seems that only the transactional part is
>>>> needed.
>>>>> Or is
>>>>>>>>>>>> the
>>>>>>>>>>>>> idempotence guarantee working behind the scenes and helping
>>> for
>>>>> some
>>>>>>>>>>>>> scenarios for which it is not worthwhile aborting a
>>> transaction
>>>>> (e.g.,
>>>>>>>>>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io>
>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I second the concern on with the one producer per task
>>>>> approach. At a
>>>>>>>>>>>>>>> high-level it seems to make sense but I think Damian is
>>>> exactly
>>>>> right
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> that cuts against the general design of the producer. Many
>>>>> people
>>>>>>>>>> have
>>>>>>>>>>>>> high
>>>>>>>>>>>>>>> input partition counts and will have high task counts as a
>>>>> result. I
>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> processing 1000 partitions should not be an unreasonable
>>> thing
>>>>> to
>>>>>>>>>> want
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> do.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The tricky bits will be:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
>>>>> memory to
>>>>>>>>>>>> get
>>>>>>>>>>>>>>>  equivalent batching). This doesn't show up in simple
>>>> benchmarks
>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
>>>>> additional
>>>>>>>>>>>>> threads
>>>>>>>>>>>>>>>  from all the producers can make a single-threaded
>> benchmark
>>>>> seem
>>>>>>>>>>>>> faster.
>>>>>>>>>>>>>>>  - TCP connection explosion. We maintain one connection
>> per
>>>>> broker.
>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>  is already high since each app instance does this. This
>>>> design
>>>>>>>>>>>> though
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>  add an additional multiplicative factor based on the
>>>> partition
>>>>>>>>>> count
>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>  input.
>>>>>>>>>>>>>>>  - Connection and metadata request storms. When an
>> instance
>>>> with
>>>>>>>>>> 1000
>>>>>>>>>>>>>>>  tasks starts up it is going to try to create many
>> thousands
>>>> of
>>>>>>>>>>>>> connections
>>>>>>>>>>>>>>>  and issue a thousand metadata requests all at once.
>>>>>>>>>>>>>>>  - Memory usage. We currently default to 64MB per
>> producer.
>>>>> This can
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>  tuned down, but the fact that we are spreading the
>> batching
>>>>> over
>>>>>>>>>>>> more
>>>>>>>>>>>>>>>  producers will fundamentally mean we need a lot more
>> memory
>>>> to
>>>>> get
>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>  perf and the memory usage will change as your task
>>> assignment
>>>>>>>>>>>> changes
>>>>>>>>>>>>> so it
>>>>>>>>>>>>>>>  will be hard to set correctly unless it is done
>>>> automatically.
>>>>>>>>>>>>>>>  - Metrics explosion (1000 producer instances, each with
>>> their
>>>>> own
>>>>>>>>>>>>>>>  metrics to monitor).
>>>>>>>>>>>>>>>  - Thread explosion, 1000 background threads, one per
>>>> producer,
>>>>> each
>>>>>>>>>>>>>>>  sending data.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
>>>>> damian.guy@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the KIP! This is an important feature for
>> Kafka
>>>>> Streams
>>>>>>>>>>>> and
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have some concerns/questions:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  1. Producer per task: I'm worried about the overhead
>> this
>>> is
>>>>> going
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You
>> can
>>>>> easily
>>>>>>>>>>>>>>>> imagine
>>>>>>>>>>>>>>>>  an app consuming thousands of partitions. What load will
>>>> this
>>>>> put
>>>>>>>>>>>> on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>  brokers? Am i correct in assuming that there will be
>>>> metadata
>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>  Producer? The memory overhead in the streams app will
>> also
>>>>>>>>>> increase
>>>>>>>>>>>>>>>> fairly
>>>>>>>>>>>>>>>>  significantly. Should we adjust
>>>> ProducerConfig.BUFFER_MEMORY_
>>>>>>>>>>>> CONFIG?
>>>>>>>>>>>>>>>>  2. State Store recovery: As we already know, restoring
>> the
>>>>> entire
>>>>>>>>>>>>>>>>  changelog can take an extremely long time. Even with a
>>>> fairly
>>>>>>>>>> small
>>>>>>>>>>>>>>>> dataset
>>>>>>>>>>>>>>>>  and an inappropriately tuned segment size, this can take
>>> way
>>>>> too
>>>>>>>>>>>>> long.
>>>>>>>>>>>>>>>> My
>>>>>>>>>>>>>>>>  concern is that failures happen and then recovery takes
>>>>> "forever"
>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>  end up in a situation where we need to change the
>>>>>>>>>> max.poll.interval
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>  some very large number or else we end up in "rebalance
>>>> hell".
>>>>> I
>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>  this provides a very good user experience. You mention
>>>> RocksDB
>>>>>>>>>>>>>>>>  checkpointing in the doc - should we explore this idea
>>> some
>>>>> more?
>>>>>>>>>>>>> i.e.,
>>>>>>>>>>>>>>>>  understand the penalty for checkpointing. Maybe
>> checkpoint
>>>>> every
>>>>>>>>>>>> *n*
>>>>>>>>>>>>>>>>   commits?
>>>>>>>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
>>>>> interval
>>>>>>>>>> to
>>>>>>>>>>>>>>>>  100ms then the cache is not going to be very effective.
>>>>> Should it
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>  disabled?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <
>>>> wangguoz@gmail.com
>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
>>>>> Streams and
>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This KIP enables Streams users to optionally turn on
>>>>> exactly-once
>>>>>>>>>>>>>>>>> processing semantics without changing their app code at
>>> all
>>>> by
>>>>>>>>>>>>> leveraging
>>>>>>>>>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The above wiki page provides a high-level view of the
>>>> proposed
>>>>>>>>>>>>> changes,
>>>>>>>>>>>>>>>>> while detailed implementation design can be found in
>> this
>>>>> Google
>>>>>>>>>>>> doc:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://docs.google.com/document/d/
>>>>> 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>>>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Michael Noll <mi...@confluent.io>.
Ah, I think the use of the word "regression" was misleading.

Let me try to rephrase:  At the moment it is unclear to me whether the
scenario where we need to rebuild a state store from scratch will be as
likely, a little bit more likely, much more likely, or very, very much more
likely to happen than pre-EOS.  So far, I had the impression this question
was hand-waved a bit.

Yes, you can enable/disable EOS for Streams, and by default it is disabled,
so we shouldn't talk about a "regression" here (agreed).  But, if we don't
understand 1. under which conditions we'd trigger a full rebuild of state
stores and 2. how likely these conditions are, then I am concerned that we
could (worst case) end up with a V1 implementation that turns out unusable
for many users.  And even if we label the new functionality as
"experimental" (which IIRC is the case), this could still be perceived by
users in a negative way.  (Just think about how long Mongo has been having
the stigma of "it loses my data".)

What I'd suggest us to do is to at least address my point (1) above --
explaining under which conditions we'd retrigger a full rebuild of state
stores, what would lead to failed producer transactions, etc.  This should
be a rather quick exercise, given the offline discussion we had earlier
this week.  I agree that (2) -- the likelihood of these things to happen --
is much harder to quantify, particularly at a point where we don't even
have a fully working implementation (:-P), so this should perhaps wait
until later.  That said, having a reasonably good understanding of (1)
should also help us to review the current Streams EOS design, help with
testing/benchmarking, and also help to possibly improve the V1
implementation if need be.

Does that make sense?
Michael






On Wed, Mar 22, 2017 at 5:07 PM, Sriram Subramanian <ra...@confluent.io>
wrote:

> This is a completely new feature which is controlled by a config. It would
> be a regression if you upgraded streams and saw a different behavior. That
> would not happen in this case. This is similar to how we are introducing
> idempotent producer in core kafka with a config to turn it on. There is no
> guarantee that the performance of the producer with the config turned on
> will be the same although eventually we will like to get to it.
>
> On Wed, Mar 22, 2017 at 7:12 AM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > I second Eno's concern regarding the impact of Streams EOS on state
> stores.
> >
> > >  We do a full recovery today and the EOS proposal will not make this
> any
> > worse.
> >
> > Yes, today we do a full state store recovery under certain failures.
> > However, I think the point (or perhaps: open question) is that, with the
> > EOS design, there's now an *increased likelihood* of such failures that
> > trigger full state store recovery.  If this increase is significant,
> then I
> > would consider this to be a regression that we should address.
> >
> > As Eno said:
> >
> > > currently we pay the recovery price for a Kafka Streams instance
> failure.
> > > Now we might pay it for a transaction failure. Will transaction
> failures
> > be
> > > more or less common than the previous types of failures?
> >
> > Damian voiced similar concerns at the very beginning of this discussion,
> > not sure what his current opinion is here.
> >
> > -Michael
> >
> >
> >
> >
> >
> > On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <ra...@confluent.io>
> > wrote:
> >
> > > To add to this discussion, I do think we should think about this in
> > > increments. We do a full recovery today and the EOS proposal will not
> > make
> > > this any worse. Using store snapshot is a good option to avoid store
> > > recovery in the future but as Eno points out, all pluggable stores
> would
> > > need to have this ability. W.r.t transaction failures, this should not
> be
> > > an issue. We should be simply retrying. There is one optimization we
> can
> > do
> > > for clean shutdowns. We could store a clean shutdown file that contains
> > the
> > > input offsets. This file gets written when you close the streams
> > instance.
> > > On start, you could can check the offsets from the shutdown file and
> > > compare it with the offsets we get from the consumer and ensure they
> > match.
> > > If they do, you could use the same store instead of recovering.
> However,
> > if
> > > we go with the snapshot approach, this will not be required. My vote
> > would
> > > be to implement V1 and solve the bootstrap problem which exist today in
> > the
> > > future versions.
> > >
> > > On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > Thanks for your feedback Eno.
> > > >
> > > > For now, I still think that the KIP itself does not need to talk
> about
> > > > this in more detail, because we apply the same strategy for EoS as
> for
> > > > non-EoS (as of 0.10.2).
> > > >
> > > > Thus, in case of a clean shutdown, we write the checkpoint file for a
> > > > store and thus know we can reuse the store. In case of failure, we
> need
> > > > to recreate the store from the changelog.
> > > >
> > > > > Will a V1 design that relies on plain store recovery from Kafka for
> > > > > each transaction abort be good enough, or usable?
> > > >
> > > > Why should it not be usable? It's the same strategy as used in 0.10.2
> > > > and it runs in production in many companies already.
> > > >
> > > > > however it seems to me we might have a regression of sorts
> > > > > Now we might pay it for a transaction failure.
> > > >
> > > > I would assume transaction failures to be quite rare. Maybe the core
> > EoS
> > > > folks can comment here, too.
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 3/20/17 3:16 PM, Eno Thereska wrote:
> > > > > Hi Matthias,
> > > > >
> > > > > I'd like to see some more info on how you propose to handle
> > > transactions
> > > > that involve state stores in the KIP itself. The design doc has info
> > > about
> > > > various optimisations like RocksDb snapshots and transactions and
> such,
> > > but
> > > > will there be a user-visible interface that indicates whether a store
> > has
> > > > snapshot and/or transactional capabilities? If a user plugs in
> another
> > > > store, what guarantees are they expected to get?
> > > > >
> > > > > Will a V1 design that relies on plain store recovery from Kafka for
> > > each
> > > > transaction abort be good enough, or usable? If your dataset is large
> > > > (e.g., 200GB) the recovery time might be so large as to effectively
> > > render
> > > > that Kafka Streams instance unavailable for tens of minutes. You
> > mention
> > > > that is not a regression to what we currently have, however it seems
> to
> > > me
> > > > we might have a regression of sorts: currently we pay the recovery
> > price
> > > > for a Kafka Streams instance failure. Now we might pay it for a
> > > transaction
> > > > failure. Will transaction failures be more or less common than the
> > > previous
> > > > types of failures? I'd like to see this addressed.
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > >
> > > > >
> > > > >
> > > > >> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
> > > > wrote:
> > > > >>
> > > > >> Just a quick follow up:
> > > > >>
> > > > >> Our overall proposal is, to implement KIP-129 as is as a “Stream
> EoS
> > > > >> 1.0” version. The raised concerns are all valid, but hard to
> > quantify
> > > at
> > > > >> the moment. Implementing KIP-129, that provides a clean design,
> > allows
> > > > >> us to gain more insight in the performance implications. This
> > enables
> > > > >> us, to make an educated decision, if the “producer per task” model
> > > > >> perform wells or not, and if a switch to a “producer per thread”
> > model
> > > > >> is mandatory.
> > > > >>
> > > > >> We also want to point out, that we can move incrementally from
> > > "producer
> > > > >> per task" to "producer per thread" design or apply some
> incremental
> > > > >> improvements to "producer per task" (as discussed in the doc).
> Thus,
> > > > >> there is not issue with regard to upgrading.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >>
> > > > >> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> > > > >>> Hi,
> > > > >>>
> > > > >>> I want to pick up this thread again. As there are some concerns
> > about
> > > > >>> the "producer per task" design, we did write up an alternative
> > > > "producer
> > > > >>> per thread" design and discuss pros/cons of both approaches:
> > > > >>>
> > > > >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
> > > > zXISV4oE0ZeMZwT_sG1QWgL4EE
> > > > >>>
> > > > >>>
> > > > >>> Looking forward to your feedback.
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>>
> > > > >>> On 3/10/17 3:24 AM, Damian Guy wrote:
> > > > >>>> Hi Matthias,
> > > > >>>>
> > > > >>>> Thanks for the response. I agree with you regarding the use of
> > > > >>>> PartitionGrouper to reduce the number of tasks. It would be good
> > to
> > > > have an
> > > > >>>> idea of any additional load on the brokers as we increase the
> > number
> > > > of
> > > > >>>> tasks and therefore producers.
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>> Damian
> > > > >>>>
> > > > >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <
> > matthias@confluent.io>
> > > > wrote:
> > > > >>>>
> > > > >>>>> Damian, Jun,
> > > > >>>>>
> > > > >>>>> Thanks for your input.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> About Performance test:
> > > > >>>>>
> > > > >>>>> I can follow up with more performance tests using more
> partitions
> > > and
> > > > >>>>> also collecting broker metrics.
> > > > >>>>>
> > > > >>>>> However, I want to highlight again, that even if 1000+
> partitions
> > > > would
> > > > >>>>> be problematic, one can simply implement PartitionGrouper
> > interface
> > > > and
> > > > >>>>> reduce the number of tasks to 250 or 100... So I am not sure,
> if
> > we
> > > > >>>>> should block this KIP, even if there might be some performance
> > > > penalty
> > > > >>>>> for currently single partitioned tasks.
> > > > >>>>>
> > > > >>>>> About memory usage. JXM max-heap and max-off-heap did report
> > 256MB
> > > > and
> > > > >>>>> 133MB for all experiments (thus I did not put it in the
> > > spreadsheet).
> > > > >>>>> Thus, using 100 producers (each using a max of 32MB of memory)
> > was
> > > > not
> > > > >>>>> an issue with regard to memory consumption. I did not track
> > > "current
> > > > >>>>> head/off-heap" memory as this would require a more advance test
> > > > setup to
> > > > >>>>> monitor it over time. If you think this would be required, we
> can
> > > do
> > > > >>>>> some tests though.
> > > > >>>>>
> > > > >>>>> However, as 256 MB was enough memory, and there are other
> > > components
> > > > >>>>> next to the producers using memory, I don't expect a severely
> > > > increased
> > > > >>>>> memory usage. Producer allocate memory on-demand, and if load
> is
> > > > shared
> > > > >>>>> over multiple producers, overall memory usage should stay the
> > same
> > > > as a
> > > > >>>>> single producer should allocate less memory.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> About Batching:
> > > > >>>>>
> > > > >>>>> As you can see from the benchmarks (in the detailed view -- I
> > also
> > > > added
> > > > >>>>> some graphs to the summary now) the average batch size gets
> > > slightly
> > > > >>>>> decrease with an increased number of partitions. However, there
> > is
> > > no
> > > > >>>>> big difference between "producer per thread" and "producer per
> > > task"
> > > > >>>>> scenario.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> About acks:
> > > > >>>>>
> > > > >>>>> This is covered by KIP-98 already. If idempotent producer is
> use,
> > > > it's
> > > > >>>>> required to set max.in.flight.requests.per.connection=1 and
> > > retries
> > > > > 0
> > > > >>>>> -- otherwise a config exception will be thrown. For
> transactions,
> > > > it's
> > > > >>>>> further required that acks=-1 to avoid a config exception.
> > > > >>>>>
> > > > >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
> > > > broker/topic
> > > > >>>>> configs) are out of scope, and it's user responsibility to set
> > > those
> > > > >>>>> values correctly to ensure transactionality and idempotency.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
> > > > >>>>>> Hi, Guozhang,
> > > > >>>>>>
> > > > >>>>>> Thanks for the KIP. A couple of comments.
> > > > >>>>>>
> > > > >>>>>> 1. About the impact on producer batching. My understanding is
> > that
> > > > >>>>>> typically different sub-topologies in the same task are
> > publishing
> > > > to
> > > > >>>>>> different topics. Since the producer batching happens at the
> > > > >>>>>> topic/partition level, using a producer per task may not
> impact
> > > > batching
> > > > >>>>>> much.
> > > > >>>>>>
> > > > >>>>>> 2. When processing.guarantee is set to exactly_once, do we
> want
> > to
> > > > >>>>> enforce
> > > > >>>>>> acks to all in the producer? The default acks is 1 and may
> cause
> > > > acked
> > > > >>>>> data
> > > > >>>>>> to be lost later when the leader changes.
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>>
> > > > >>>>>> Jun
> > > > >>>>>>
> > > > >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <
> > damian.guy@gmail.com>
> > > > wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Matthias,
> > > > >>>>>>>
> > > > >>>>>>> Thanks. The perf test is a good start but I don't think it
> goes
> > > far
> > > > >>>>> enough.
> > > > >>>>>>> 100 partitions is not a lot. What happens when there are
> > > thousands
> > > > of
> > > > >>>>>>> partitions? What is the load on the brokers? How much more
> > memory
> > > > is
> > > > >>>>> used
> > > > >>>>>>> by the Streams App etc?
> > > > >>>>>>>
> > > > >>>>>>> Thanks,
> > > > >>>>>>> Damian
> > > > >>>>>>>
> > > > >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > >>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi,
> > > > >>>>>>>>
> > > > >>>>>>>> I want to give a first respond:
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 1. Producer per task:
> > > > >>>>>>>>
> > > > >>>>>>>> First, we did some performance tests, indicating that the
> > > > performance
> > > > >>>>>>>> penalty is small. Please have a look here:
> > > > >>>>>>>>
> > > > >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
> > > > >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> > > > >>>>>>>>
> > > > >>>>>>>> For the test, we ran with a trunk version and a modified
> > version
> > > > that
> > > > >>>>>>>> uses a producer per task (of course, no transactions, but
> > > > at-least-once
> > > > >>>>>>>> semantics). The scaling factor indicates the number of
> brokers
> > > and
> > > > >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark
> > > that
> > > > is
> > > > >>>>>>>> part of AK code base.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Second, as the design is "producer per task" (and not
> > "producer
> > > > per
> > > > >>>>>>>> partition") it is possible to specify a custom
> > PartitionGrouper
> > > > that
> > > > >>>>>>>> assigns multiple partitions to a single task. Thus, it
> allows
> > to
> > > > reduce
> > > > >>>>>>>> the number of tasks for scenarios with many partitions.
> Right
> > > > now, this
> > > > >>>>>>>> interface must be implemented solely by the user, but we
> could
> > > > also add
> > > > >>>>>>>> a new config parameter that specifies the
> max.number.of.tasks
> > or
> > > > >>>>>>>> partitions.per.task so that the user can configure this
> > instead
> > > of
> > > > >>>>>>>> implementing the interface.
> > > > >>>>>>>>
> > > > >>>>>>>> Third, there is the idea of a "Producer Pool" that would
> allow
> > > to
> > > > share
> > > > >>>>>>>> resources (network connections, memory, etc) over multiple
> > > > producers.
> > > > >>>>>>>> This would allow to separate multiple transaction on the
> > > producer
> > > > >>>>> level,
> > > > >>>>>>>> while resources are shared. There is no detailed design
> > document
> > > > yet
> > > > >>>>> and
> > > > >>>>>>>> there would be a KIP for this feature.
> > > > >>>>>>>>
> > > > >>>>>>>> Thus, if there should be any performance problems for high
> > scale
> > > > >>>>>>>> scenarios, there are multiple ways to tackle them while
> > keeping
> > > > the
> > > > >>>>>>>> "producer per task" design.
> > > > >>>>>>>>
> > > > >>>>>>>> Additionally, a "producer per thread" design would be way
> more
> > > > complex
> > > > >>>>>>>> and I summarized the issues in a separate document. I will
> > share
> > > > a link
> > > > >>>>>>>> to the document soon.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 2. StateStore recovery:
> > > > >>>>>>>>
> > > > >>>>>>>> Streams EoS will in the first design not allow to exploit
> the
> > > > >>>>>>>> improvements that are added for 0.11 at the moment. However,
> > as
> > > > 0.10.2
> > > > >>>>>>>> faces the same issues of potentially long recovery, there is
> > no
> > > > >>>>>>>> regression with this regard. Thus, I see those improvements
> as
> > > > >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to
> explore
> > > > those
> > > > >>>>>>>> options and if possible get them into 0.11 such that Streams
> > > with
> > > > EoS
> > > > >>>>>>>> gets the same improvements as at-least-once scenario.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 3. Caching:
> > > > >>>>>>>>
> > > > >>>>>>>> We might need to do some experiments to quantify the impact
> on
> > > > caching.
> > > > >>>>>>>> If it's severe, the suggested default commit interval of
> 100ms
> > > > could
> > > > >>>>>>>> also be increased. Also, EoS will not enforce any commit
> > > > interval, but
> > > > >>>>>>>> only change the default value. Thus, a user can freely
> > trade-off
> > > > >>>>> latency
> > > > >>>>>>>> vs. caching-effect.
> > > > >>>>>>>>
> > > > >>>>>>>> Last but not least, there is the idea to allow
> > > "read_uncommitted"
> > > > for
> > > > >>>>>>>> intermediate topic. This would be an advance design for
> > Streams
> > > > EoS
> > > > >>>>> that
> > > > >>>>>>>> allows downstream sub-topologies to read uncommitted data
> > > > >>>>>>>> optimistically. In case of failure, a cascading abort of
> > > > transactions
> > > > >>>>>>>> would be required. This change will need another KIP.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 4. Idempotent Producer:
> > > > >>>>>>>>
> > > > >>>>>>>> The transactional part automatically leverages the
> idempotent
> > > > >>>>> properties
> > > > >>>>>>>> of the producer. Idempotency is a requirement:
> > > > >>>>>>>>
> > > > >>>>>>>>> Note that enable.idempotence must be enabled if a
> > > > TransactionalId is
> > > > >>>>>>>> configured.
> > > > >>>>>>>>
> > > > >>>>>>>> See
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>> https://docs.google.com/document/d/11Jqy_
> > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> > > > >>>>>>>>
> > > > >>>>>>>> All idempotent retries, are handled by the producer
> internally
> > > > (with or
> > > > >>>>>>>> without transaction) if enable.idempotence is set to true.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > > > >>>>>>>>> Another question:
> > > > >>>>>>>>>
> > > > >>>>>>>>> The KIP doesn’t exactly spell out how it uses the
> idempotence
> > > > >>>>> guarantee
> > > > >>>>>>>> from KIP-98. It seems that only the transactional part is
> > > needed.
> > > > Or is
> > > > >>>>>>> the
> > > > >>>>>>>> idempotence guarantee working behind the scenes and helping
> > for
> > > > some
> > > > >>>>>>>> scenarios for which it is not worthwhile aborting a
> > transaction
> > > > (e.g.,
> > > > >>>>>>>> retransmitting a record after a temporary network glitch)?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks
> > > > >>>>>>>>> Eno
> > > > >>>>>>>>>
> > > > >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io>
> > > wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I second the concern on with the one producer per task
> > > > approach. At a
> > > > >>>>>>>>>> high-level it seems to make sense but I think Damian is
> > > exactly
> > > > right
> > > > >>>>>>>> that
> > > > >>>>>>>>>> that cuts against the general design of the producer. Many
> > > > people
> > > > >>>>> have
> > > > >>>>>>>> high
> > > > >>>>>>>>>> input partition counts and will have high task counts as a
> > > > result. I
> > > > >>>>>>>> think
> > > > >>>>>>>>>> processing 1000 partitions should not be an unreasonable
> > thing
> > > > to
> > > > >>>>> want
> > > > >>>>>>>> to
> > > > >>>>>>>>>> do.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The tricky bits will be:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
> > > > memory to
> > > > >>>>>>> get
> > > > >>>>>>>>>>  equivalent batching). This doesn't show up in simple
> > > benchmarks
> > > > >>>>>>>> because
> > > > >>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
> > > > additional
> > > > >>>>>>>> threads
> > > > >>>>>>>>>>  from all the producers can make a single-threaded
> benchmark
> > > > seem
> > > > >>>>>>>> faster.
> > > > >>>>>>>>>>  - TCP connection explosion. We maintain one connection
> per
> > > > broker.
> > > > >>>>>>>> This
> > > > >>>>>>>>>>  is already high since each app instance does this. This
> > > design
> > > > >>>>>>> though
> > > > >>>>>>>> will
> > > > >>>>>>>>>>  add an additional multiplicative factor based on the
> > > partition
> > > > >>>>> count
> > > > >>>>>>>> of the
> > > > >>>>>>>>>>  input.
> > > > >>>>>>>>>>  - Connection and metadata request storms. When an
> instance
> > > with
> > > > >>>>> 1000
> > > > >>>>>>>>>>  tasks starts up it is going to try to create many
> thousands
> > > of
> > > > >>>>>>>> connections
> > > > >>>>>>>>>>  and issue a thousand metadata requests all at once.
> > > > >>>>>>>>>>  - Memory usage. We currently default to 64MB per
> producer.
> > > > This can
> > > > >>>>>>> be
> > > > >>>>>>>>>>  tuned down, but the fact that we are spreading the
> batching
> > > > over
> > > > >>>>>>> more
> > > > >>>>>>>>>>  producers will fundamentally mean we need a lot more
> memory
> > > to
> > > > get
> > > > >>>>>>>> good
> > > > >>>>>>>>>>  perf and the memory usage will change as your task
> > assignment
> > > > >>>>>>> changes
> > > > >>>>>>>> so it
> > > > >>>>>>>>>>  will be hard to set correctly unless it is done
> > > automatically.
> > > > >>>>>>>>>>  - Metrics explosion (1000 producer instances, each with
> > their
> > > > own
> > > > >>>>>>>>>>  metrics to monitor).
> > > > >>>>>>>>>>  - Thread explosion, 1000 background threads, one per
> > > producer,
> > > > each
> > > > >>>>>>>>>>  sending data.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Jay
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
> > > > damian.guy@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Guozhang,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for the KIP! This is an important feature for
> Kafka
> > > > Streams
> > > > >>>>>>> and
> > > > >>>>>>>> will
> > > > >>>>>>>>>>> help to unlock a bunch of use cases.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I have some concerns/questions:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>  1. Producer per task: I'm worried about the overhead
> this
> > is
> > > > going
> > > > >>>>>>> to
> > > > >>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You
> can
> > > > easily
> > > > >>>>>>>>>>> imagine
> > > > >>>>>>>>>>>  an app consuming thousands of partitions. What load will
> > > this
> > > > put
> > > > >>>>>>> on
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>  brokers? Am i correct in assuming that there will be
> > > metadata
> > > > >>>>>>>> requests
> > > > >>>>>>>>>>> per
> > > > >>>>>>>>>>>  Producer? The memory overhead in the streams app will
> also
> > > > >>>>> increase
> > > > >>>>>>>>>>> fairly
> > > > >>>>>>>>>>>  significantly. Should we adjust
> > > ProducerConfig.BUFFER_MEMORY_
> > > > >>>>>>> CONFIG?
> > > > >>>>>>>>>>>  2. State Store recovery: As we already know, restoring
> the
> > > > entire
> > > > >>>>>>>>>>>  changelog can take an extremely long time. Even with a
> > > fairly
> > > > >>>>> small
> > > > >>>>>>>>>>> dataset
> > > > >>>>>>>>>>>  and an inappropriately tuned segment size, this can take
> > way
> > > > too
> > > > >>>>>>>> long.
> > > > >>>>>>>>>>> My
> > > > >>>>>>>>>>>  concern is that failures happen and then recovery takes
> > > > "forever"
> > > > >>>>>>>> and we
> > > > >>>>>>>>>>>  end up in a situation where we need to change the
> > > > >>>>> max.poll.interval
> > > > >>>>>>>> to
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>  some very large number or else we end up in "rebalance
> > > hell".
> > > > I
> > > > >>>>>>> don't
> > > > >>>>>>>>>>> think
> > > > >>>>>>>>>>>  this provides a very good user experience. You mention
> > > RocksDB
> > > > >>>>>>>>>>>  checkpointing in the doc - should we explore this idea
> > some
> > > > more?
> > > > >>>>>>>> i.e.,
> > > > >>>>>>>>>>>  understand the penalty for checkpointing. Maybe
> checkpoint
> > > > every
> > > > >>>>>>> *n*
> > > > >>>>>>>>>>>   commits?
> > > > >>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
> > > > interval
> > > > >>>>> to
> > > > >>>>>>>>>>>  100ms then the cache is not going to be very effective.
> > > > Should it
> > > > >>>>>>>> just
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>  disabled?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>> Damian
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
> > > > Streams and
> > > > >>>>>>>>>>> provide
> > > > >>>>>>>>>>>> exactly-once processing semantics:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> This KIP enables Streams users to optionally turn on
> > > > exactly-once
> > > > >>>>>>>>>>>> processing semantics without changing their app code at
> > all
> > > by
> > > > >>>>>>>> leveraging
> > > > >>>>>>>>>>>> the transactional messaging features provided in KIP-98.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> The above wiki page provides a high-level view of the
> > > proposed
> > > > >>>>>>>> changes,
> > > > >>>>>>>>>>>> while detailed implementation design can be found in
> this
> > > > Google
> > > > >>>>>>> doc:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> https://docs.google.com/document/d/
> > > > 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> > > > >>>>>>>>>>> FK1DAB8_gBYA2c
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> We would love to hear your comments and suggestions.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Sriram Subramanian <ra...@confluent.io>.
This is a completely new feature which is controlled by a config. It would
be a regression if you upgraded streams and saw a different behavior. That
would not happen in this case. This is similar to how we are introducing
idempotent producer in core kafka with a config to turn it on. There is no
guarantee that the performance of the producer with the config turned on
will be the same although eventually we will like to get to it.

On Wed, Mar 22, 2017 at 7:12 AM, Michael Noll <mi...@confluent.io> wrote:

> I second Eno's concern regarding the impact of Streams EOS on state stores.
>
> >  We do a full recovery today and the EOS proposal will not make this any
> worse.
>
> Yes, today we do a full state store recovery under certain failures.
> However, I think the point (or perhaps: open question) is that, with the
> EOS design, there's now an *increased likelihood* of such failures that
> trigger full state store recovery.  If this increase is significant, then I
> would consider this to be a regression that we should address.
>
> As Eno said:
>
> > currently we pay the recovery price for a Kafka Streams instance failure.
> > Now we might pay it for a transaction failure. Will transaction failures
> be
> > more or less common than the previous types of failures?
>
> Damian voiced similar concerns at the very beginning of this discussion,
> not sure what his current opinion is here.
>
> -Michael
>
>
>
>
>
> On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <ra...@confluent.io>
> wrote:
>
> > To add to this discussion, I do think we should think about this in
> > increments. We do a full recovery today and the EOS proposal will not
> make
> > this any worse. Using store snapshot is a good option to avoid store
> > recovery in the future but as Eno points out, all pluggable stores would
> > need to have this ability. W.r.t transaction failures, this should not be
> > an issue. We should be simply retrying. There is one optimization we can
> do
> > for clean shutdowns. We could store a clean shutdown file that contains
> the
> > input offsets. This file gets written when you close the streams
> instance.
> > On start, you could can check the offsets from the shutdown file and
> > compare it with the offsets we get from the consumer and ensure they
> match.
> > If they do, you could use the same store instead of recovering. However,
> if
> > we go with the snapshot approach, this will not be required. My vote
> would
> > be to implement V1 and solve the bootstrap problem which exist today in
> the
> > future versions.
> >
> > On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Thanks for your feedback Eno.
> > >
> > > For now, I still think that the KIP itself does not need to talk about
> > > this in more detail, because we apply the same strategy for EoS as for
> > > non-EoS (as of 0.10.2).
> > >
> > > Thus, in case of a clean shutdown, we write the checkpoint file for a
> > > store and thus know we can reuse the store. In case of failure, we need
> > > to recreate the store from the changelog.
> > >
> > > > Will a V1 design that relies on plain store recovery from Kafka for
> > > > each transaction abort be good enough, or usable?
> > >
> > > Why should it not be usable? It's the same strategy as used in 0.10.2
> > > and it runs in production in many companies already.
> > >
> > > > however it seems to me we might have a regression of sorts
> > > > Now we might pay it for a transaction failure.
> > >
> > > I would assume transaction failures to be quite rare. Maybe the core
> EoS
> > > folks can comment here, too.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 3/20/17 3:16 PM, Eno Thereska wrote:
> > > > Hi Matthias,
> > > >
> > > > I'd like to see some more info on how you propose to handle
> > transactions
> > > that involve state stores in the KIP itself. The design doc has info
> > about
> > > various optimisations like RocksDb snapshots and transactions and such,
> > but
> > > will there be a user-visible interface that indicates whether a store
> has
> > > snapshot and/or transactional capabilities? If a user plugs in another
> > > store, what guarantees are they expected to get?
> > > >
> > > > Will a V1 design that relies on plain store recovery from Kafka for
> > each
> > > transaction abort be good enough, or usable? If your dataset is large
> > > (e.g., 200GB) the recovery time might be so large as to effectively
> > render
> > > that Kafka Streams instance unavailable for tens of minutes. You
> mention
> > > that is not a regression to what we currently have, however it seems to
> > me
> > > we might have a regression of sorts: currently we pay the recovery
> price
> > > for a Kafka Streams instance failure. Now we might pay it for a
> > transaction
> > > failure. Will transaction failures be more or less common than the
> > previous
> > > types of failures? I'd like to see this addressed.
> > > >
> > > > Thanks
> > > > Eno
> > > >
> > > >
> > > >
> > > >> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > > >>
> > > >> Just a quick follow up:
> > > >>
> > > >> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
> > > >> 1.0” version. The raised concerns are all valid, but hard to
> quantify
> > at
> > > >> the moment. Implementing KIP-129, that provides a clean design,
> allows
> > > >> us to gain more insight in the performance implications. This
> enables
> > > >> us, to make an educated decision, if the “producer per task” model
> > > >> perform wells or not, and if a switch to a “producer per thread”
> model
> > > >> is mandatory.
> > > >>
> > > >> We also want to point out, that we can move incrementally from
> > "producer
> > > >> per task" to "producer per thread" design or apply some incremental
> > > >> improvements to "producer per task" (as discussed in the doc). Thus,
> > > >> there is not issue with regard to upgrading.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> > > >>> Hi,
> > > >>>
> > > >>> I want to pick up this thread again. As there are some concerns
> about
> > > >>> the "producer per task" design, we did write up an alternative
> > > "producer
> > > >>> per thread" design and discuss pros/cons of both approaches:
> > > >>>
> > > >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
> > > zXISV4oE0ZeMZwT_sG1QWgL4EE
> > > >>>
> > > >>>
> > > >>> Looking forward to your feedback.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>> On 3/10/17 3:24 AM, Damian Guy wrote:
> > > >>>> Hi Matthias,
> > > >>>>
> > > >>>> Thanks for the response. I agree with you regarding the use of
> > > >>>> PartitionGrouper to reduce the number of tasks. It would be good
> to
> > > have an
> > > >>>> idea of any additional load on the brokers as we increase the
> number
> > > of
> > > >>>> tasks and therefore producers.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Damian
> > > >>>>
> > > >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > > >>>>
> > > >>>>> Damian, Jun,
> > > >>>>>
> > > >>>>> Thanks for your input.
> > > >>>>>
> > > >>>>>
> > > >>>>> About Performance test:
> > > >>>>>
> > > >>>>> I can follow up with more performance tests using more partitions
> > and
> > > >>>>> also collecting broker metrics.
> > > >>>>>
> > > >>>>> However, I want to highlight again, that even if 1000+ partitions
> > > would
> > > >>>>> be problematic, one can simply implement PartitionGrouper
> interface
> > > and
> > > >>>>> reduce the number of tasks to 250 or 100... So I am not sure, if
> we
> > > >>>>> should block this KIP, even if there might be some performance
> > > penalty
> > > >>>>> for currently single partitioned tasks.
> > > >>>>>
> > > >>>>> About memory usage. JXM max-heap and max-off-heap did report
> 256MB
> > > and
> > > >>>>> 133MB for all experiments (thus I did not put it in the
> > spreadsheet).
> > > >>>>> Thus, using 100 producers (each using a max of 32MB of memory)
> was
> > > not
> > > >>>>> an issue with regard to memory consumption. I did not track
> > "current
> > > >>>>> head/off-heap" memory as this would require a more advance test
> > > setup to
> > > >>>>> monitor it over time. If you think this would be required, we can
> > do
> > > >>>>> some tests though.
> > > >>>>>
> > > >>>>> However, as 256 MB was enough memory, and there are other
> > components
> > > >>>>> next to the producers using memory, I don't expect a severely
> > > increased
> > > >>>>> memory usage. Producer allocate memory on-demand, and if load is
> > > shared
> > > >>>>> over multiple producers, overall memory usage should stay the
> same
> > > as a
> > > >>>>> single producer should allocate less memory.
> > > >>>>>
> > > >>>>>
> > > >>>>> About Batching:
> > > >>>>>
> > > >>>>> As you can see from the benchmarks (in the detailed view -- I
> also
> > > added
> > > >>>>> some graphs to the summary now) the average batch size gets
> > slightly
> > > >>>>> decrease with an increased number of partitions. However, there
> is
> > no
> > > >>>>> big difference between "producer per thread" and "producer per
> > task"
> > > >>>>> scenario.
> > > >>>>>
> > > >>>>>
> > > >>>>> About acks:
> > > >>>>>
> > > >>>>> This is covered by KIP-98 already. If idempotent producer is use,
> > > it's
> > > >>>>> required to set max.in.flight.requests.per.connection=1 and
> > retries
> > > > 0
> > > >>>>> -- otherwise a config exception will be thrown. For transactions,
> > > it's
> > > >>>>> further required that acks=-1 to avoid a config exception.
> > > >>>>>
> > > >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
> > > broker/topic
> > > >>>>> configs) are out of scope, and it's user responsibility to set
> > those
> > > >>>>> values correctly to ensure transactionality and idempotency.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>>
> > > >>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
> > > >>>>>> Hi, Guozhang,
> > > >>>>>>
> > > >>>>>> Thanks for the KIP. A couple of comments.
> > > >>>>>>
> > > >>>>>> 1. About the impact on producer batching. My understanding is
> that
> > > >>>>>> typically different sub-topologies in the same task are
> publishing
> > > to
> > > >>>>>> different topics. Since the producer batching happens at the
> > > >>>>>> topic/partition level, using a producer per task may not impact
> > > batching
> > > >>>>>> much.
> > > >>>>>>
> > > >>>>>> 2. When processing.guarantee is set to exactly_once, do we want
> to
> > > >>>>> enforce
> > > >>>>>> acks to all in the producer? The default acks is 1 and may cause
> > > acked
> > > >>>>> data
> > > >>>>>> to be lost later when the leader changes.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>>
> > > >>>>>> Jun
> > > >>>>>>
> > > >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <
> damian.guy@gmail.com>
> > > wrote:
> > > >>>>>>
> > > >>>>>>> Hi Matthias,
> > > >>>>>>>
> > > >>>>>>> Thanks. The perf test is a good start but I don't think it goes
> > far
> > > >>>>> enough.
> > > >>>>>>> 100 partitions is not a lot. What happens when there are
> > thousands
> > > of
> > > >>>>>>> partitions? What is the load on the brokers? How much more
> memory
> > > is
> > > >>>>> used
> > > >>>>>>> by the Streams App etc?
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Damian
> > > >>>>>>>
> > > >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> I want to give a first respond:
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> 1. Producer per task:
> > > >>>>>>>>
> > > >>>>>>>> First, we did some performance tests, indicating that the
> > > performance
> > > >>>>>>>> penalty is small. Please have a look here:
> > > >>>>>>>>
> > > >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
> > > >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> > > >>>>>>>>
> > > >>>>>>>> For the test, we ran with a trunk version and a modified
> version
> > > that
> > > >>>>>>>> uses a producer per task (of course, no transactions, but
> > > at-least-once
> > > >>>>>>>> semantics). The scaling factor indicates the number of brokers
> > and
> > > >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark
> > that
> > > is
> > > >>>>>>>> part of AK code base.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Second, as the design is "producer per task" (and not
> "producer
> > > per
> > > >>>>>>>> partition") it is possible to specify a custom
> PartitionGrouper
> > > that
> > > >>>>>>>> assigns multiple partitions to a single task. Thus, it allows
> to
> > > reduce
> > > >>>>>>>> the number of tasks for scenarios with many partitions. Right
> > > now, this
> > > >>>>>>>> interface must be implemented solely by the user, but we could
> > > also add
> > > >>>>>>>> a new config parameter that specifies the max.number.of.tasks
> or
> > > >>>>>>>> partitions.per.task so that the user can configure this
> instead
> > of
> > > >>>>>>>> implementing the interface.
> > > >>>>>>>>
> > > >>>>>>>> Third, there is the idea of a "Producer Pool" that would allow
> > to
> > > share
> > > >>>>>>>> resources (network connections, memory, etc) over multiple
> > > producers.
> > > >>>>>>>> This would allow to separate multiple transaction on the
> > producer
> > > >>>>> level,
> > > >>>>>>>> while resources are shared. There is no detailed design
> document
> > > yet
> > > >>>>> and
> > > >>>>>>>> there would be a KIP for this feature.
> > > >>>>>>>>
> > > >>>>>>>> Thus, if there should be any performance problems for high
> scale
> > > >>>>>>>> scenarios, there are multiple ways to tackle them while
> keeping
> > > the
> > > >>>>>>>> "producer per task" design.
> > > >>>>>>>>
> > > >>>>>>>> Additionally, a "producer per thread" design would be way more
> > > complex
> > > >>>>>>>> and I summarized the issues in a separate document. I will
> share
> > > a link
> > > >>>>>>>> to the document soon.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> 2. StateStore recovery:
> > > >>>>>>>>
> > > >>>>>>>> Streams EoS will in the first design not allow to exploit the
> > > >>>>>>>> improvements that are added for 0.11 at the moment. However,
> as
> > > 0.10.2
> > > >>>>>>>> faces the same issues of potentially long recovery, there is
> no
> > > >>>>>>>> regression with this regard. Thus, I see those improvements as
> > > >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore
> > > those
> > > >>>>>>>> options and if possible get them into 0.11 such that Streams
> > with
> > > EoS
> > > >>>>>>>> gets the same improvements as at-least-once scenario.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> 3. Caching:
> > > >>>>>>>>
> > > >>>>>>>> We might need to do some experiments to quantify the impact on
> > > caching.
> > > >>>>>>>> If it's severe, the suggested default commit interval of 100ms
> > > could
> > > >>>>>>>> also be increased. Also, EoS will not enforce any commit
> > > interval, but
> > > >>>>>>>> only change the default value. Thus, a user can freely
> trade-off
> > > >>>>> latency
> > > >>>>>>>> vs. caching-effect.
> > > >>>>>>>>
> > > >>>>>>>> Last but not least, there is the idea to allow
> > "read_uncommitted"
> > > for
> > > >>>>>>>> intermediate topic. This would be an advance design for
> Streams
> > > EoS
> > > >>>>> that
> > > >>>>>>>> allows downstream sub-topologies to read uncommitted data
> > > >>>>>>>> optimistically. In case of failure, a cascading abort of
> > > transactions
> > > >>>>>>>> would be required. This change will need another KIP.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> 4. Idempotent Producer:
> > > >>>>>>>>
> > > >>>>>>>> The transactional part automatically leverages the idempotent
> > > >>>>> properties
> > > >>>>>>>> of the producer. Idempotency is a requirement:
> > > >>>>>>>>
> > > >>>>>>>>> Note that enable.idempotence must be enabled if a
> > > TransactionalId is
> > > >>>>>>>> configured.
> > > >>>>>>>>
> > > >>>>>>>> See
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>> https://docs.google.com/document/d/11Jqy_
> > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> > > >>>>>>>>
> > > >>>>>>>> All idempotent retries, are handled by the producer internally
> > > (with or
> > > >>>>>>>> without transaction) if enable.idempotence is set to true.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > > >>>>>>>>> Another question:
> > > >>>>>>>>>
> > > >>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
> > > >>>>> guarantee
> > > >>>>>>>> from KIP-98. It seems that only the transactional part is
> > needed.
> > > Or is
> > > >>>>>>> the
> > > >>>>>>>> idempotence guarantee working behind the scenes and helping
> for
> > > some
> > > >>>>>>>> scenarios for which it is not worthwhile aborting a
> transaction
> > > (e.g.,
> > > >>>>>>>> retransmitting a record after a temporary network glitch)?
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks
> > > >>>>>>>>> Eno
> > > >>>>>>>>>
> > > >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io>
> > wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> I second the concern on with the one producer per task
> > > approach. At a
> > > >>>>>>>>>> high-level it seems to make sense but I think Damian is
> > exactly
> > > right
> > > >>>>>>>> that
> > > >>>>>>>>>> that cuts against the general design of the producer. Many
> > > people
> > > >>>>> have
> > > >>>>>>>> high
> > > >>>>>>>>>> input partition counts and will have high task counts as a
> > > result. I
> > > >>>>>>>> think
> > > >>>>>>>>>> processing 1000 partitions should not be an unreasonable
> thing
> > > to
> > > >>>>> want
> > > >>>>>>>> to
> > > >>>>>>>>>> do.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The tricky bits will be:
> > > >>>>>>>>>>
> > > >>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
> > > memory to
> > > >>>>>>> get
> > > >>>>>>>>>>  equivalent batching). This doesn't show up in simple
> > benchmarks
> > > >>>>>>>> because
> > > >>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
> > > additional
> > > >>>>>>>> threads
> > > >>>>>>>>>>  from all the producers can make a single-threaded benchmark
> > > seem
> > > >>>>>>>> faster.
> > > >>>>>>>>>>  - TCP connection explosion. We maintain one connection per
> > > broker.
> > > >>>>>>>> This
> > > >>>>>>>>>>  is already high since each app instance does this. This
> > design
> > > >>>>>>> though
> > > >>>>>>>> will
> > > >>>>>>>>>>  add an additional multiplicative factor based on the
> > partition
> > > >>>>> count
> > > >>>>>>>> of the
> > > >>>>>>>>>>  input.
> > > >>>>>>>>>>  - Connection and metadata request storms. When an instance
> > with
> > > >>>>> 1000
> > > >>>>>>>>>>  tasks starts up it is going to try to create many thousands
> > of
> > > >>>>>>>> connections
> > > >>>>>>>>>>  and issue a thousand metadata requests all at once.
> > > >>>>>>>>>>  - Memory usage. We currently default to 64MB per producer.
> > > This can
> > > >>>>>>> be
> > > >>>>>>>>>>  tuned down, but the fact that we are spreading the batching
> > > over
> > > >>>>>>> more
> > > >>>>>>>>>>  producers will fundamentally mean we need a lot more memory
> > to
> > > get
> > > >>>>>>>> good
> > > >>>>>>>>>>  perf and the memory usage will change as your task
> assignment
> > > >>>>>>> changes
> > > >>>>>>>> so it
> > > >>>>>>>>>>  will be hard to set correctly unless it is done
> > automatically.
> > > >>>>>>>>>>  - Metrics explosion (1000 producer instances, each with
> their
> > > own
> > > >>>>>>>>>>  metrics to monitor).
> > > >>>>>>>>>>  - Thread explosion, 1000 background threads, one per
> > producer,
> > > each
> > > >>>>>>>>>>  sending data.
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Jay
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
> > > damian.guy@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi Guozhang,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka
> > > Streams
> > > >>>>>>> and
> > > >>>>>>>> will
> > > >>>>>>>>>>> help to unlock a bunch of use cases.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I have some concerns/questions:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this
> is
> > > going
> > > >>>>>>> to
> > > >>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can
> > > easily
> > > >>>>>>>>>>> imagine
> > > >>>>>>>>>>>  an app consuming thousands of partitions. What load will
> > this
> > > put
> > > >>>>>>> on
> > > >>>>>>>> the
> > > >>>>>>>>>>>  brokers? Am i correct in assuming that there will be
> > metadata
> > > >>>>>>>> requests
> > > >>>>>>>>>>> per
> > > >>>>>>>>>>>  Producer? The memory overhead in the streams app will also
> > > >>>>> increase
> > > >>>>>>>>>>> fairly
> > > >>>>>>>>>>>  significantly. Should we adjust
> > ProducerConfig.BUFFER_MEMORY_
> > > >>>>>>> CONFIG?
> > > >>>>>>>>>>>  2. State Store recovery: As we already know, restoring the
> > > entire
> > > >>>>>>>>>>>  changelog can take an extremely long time. Even with a
> > fairly
> > > >>>>> small
> > > >>>>>>>>>>> dataset
> > > >>>>>>>>>>>  and an inappropriately tuned segment size, this can take
> way
> > > too
> > > >>>>>>>> long.
> > > >>>>>>>>>>> My
> > > >>>>>>>>>>>  concern is that failures happen and then recovery takes
> > > "forever"
> > > >>>>>>>> and we
> > > >>>>>>>>>>>  end up in a situation where we need to change the
> > > >>>>> max.poll.interval
> > > >>>>>>>> to
> > > >>>>>>>>>>> be
> > > >>>>>>>>>>>  some very large number or else we end up in "rebalance
> > hell".
> > > I
> > > >>>>>>> don't
> > > >>>>>>>>>>> think
> > > >>>>>>>>>>>  this provides a very good user experience. You mention
> > RocksDB
> > > >>>>>>>>>>>  checkpointing in the doc - should we explore this idea
> some
> > > more?
> > > >>>>>>>> i.e.,
> > > >>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint
> > > every
> > > >>>>>>> *n*
> > > >>>>>>>>>>>   commits?
> > > >>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
> > > interval
> > > >>>>> to
> > > >>>>>>>>>>>  100ms then the cache is not going to be very effective.
> > > Should it
> > > >>>>>>>> just
> > > >>>>>>>>>>> be
> > > >>>>>>>>>>>  disabled?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks,
> > > >>>>>>>>>>> Damian
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > >>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
> > > Streams and
> > > >>>>>>>>>>> provide
> > > >>>>>>>>>>>> exactly-once processing semantics:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> This KIP enables Streams users to optionally turn on
> > > exactly-once
> > > >>>>>>>>>>>> processing semantics without changing their app code at
> all
> > by
> > > >>>>>>>> leveraging
> > > >>>>>>>>>>>> the transactional messaging features provided in KIP-98.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> The above wiki page provides a high-level view of the
> > proposed
> > > >>>>>>>> changes,
> > > >>>>>>>>>>>> while detailed implementation design can be found in this
> > > Google
> > > >>>>>>> doc:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> https://docs.google.com/document/d/
> > > 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> > > >>>>>>>>>>> FK1DAB8_gBYA2c
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> We would love to hear your comments and suggestions.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>> -- Guozhang
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Michael Noll <mi...@confluent.io>.
Separate reply because I'm switching topics (no pun intended). :-)

One impact of the Streams EOS design is how we handle failures in Streams.
In the EOS design we have effectively three main failure categories, as far
as I understand:

1. Transient failures (which we now e.g. handle with infinite retries on
the side of producers)
2. Fatal errors aka "stop the world" (nothing we can do about those -- in
the worst case, these might bring down the full app)
3. Producer fence errors (happens when tasks have been migrated out of a
processing thread)

My question is:  how are failures handled that are caused by corrupt
messages, think: "poison pills"?  These typically manifest themselves as
exceptions that are thrown by the serdes.  Into which of the three failure
"categories" above would such "poison pill" exceptions fall?

For the record, today users need to take extra steps to handle such poison
pills (see e.g. [1]), but that isn't very elegant or convenient.  Since we
have already begun discussing how to improve the status quo, I am
interested in understanding the impact of this type of failure on/in the
EOS design.

-Michael


[1]
http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages




On Wed, Mar 22, 2017 at 3:12 PM, Michael Noll <mi...@confluent.io> wrote:

> I second Eno's concern regarding the impact of Streams EOS on state stores.
>
> >  We do a full recovery today and the EOS proposal will not make this
> any worse.
>
> Yes, today we do a full state store recovery under certain failures.
> However, I think the point (or perhaps: open question) is that, with the
> EOS design, there's now an *increased likelihood* of such failures that
> trigger full state store recovery.  If this increase is significant, then I
> would consider this to be a regression that we should address.
>
> As Eno said:
>
> > currently we pay the recovery price for a Kafka Streams instance
> failure.
> > Now we might pay it for a transaction failure. Will transaction failures
> be
> > more or less common than the previous types of failures?
>
> Damian voiced similar concerns at the very beginning of this discussion,
> not sure what his current opinion is here.
>
> -Michael
>
>
>
>
>
> On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <ra...@confluent.io>
> wrote:
>
>> To add to this discussion, I do think we should think about this in
>> increments. We do a full recovery today and the EOS proposal will not make
>> this any worse. Using store snapshot is a good option to avoid store
>> recovery in the future but as Eno points out, all pluggable stores would
>> need to have this ability. W.r.t transaction failures, this should not be
>> an issue. We should be simply retrying. There is one optimization we can
>> do
>> for clean shutdowns. We could store a clean shutdown file that contains
>> the
>> input offsets. This file gets written when you close the streams instance.
>> On start, you could can check the offsets from the shutdown file and
>> compare it with the offsets we get from the consumer and ensure they
>> match.
>> If they do, you could use the same store instead of recovering. However,
>> if
>> we go with the snapshot approach, this will not be required. My vote would
>> be to implement V1 and solve the bootstrap problem which exist today in
>> the
>> future versions.
>>
>> On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>> > Thanks for your feedback Eno.
>> >
>> > For now, I still think that the KIP itself does not need to talk about
>> > this in more detail, because we apply the same strategy for EoS as for
>> > non-EoS (as of 0.10.2).
>> >
>> > Thus, in case of a clean shutdown, we write the checkpoint file for a
>> > store and thus know we can reuse the store. In case of failure, we need
>> > to recreate the store from the changelog.
>> >
>> > > Will a V1 design that relies on plain store recovery from Kafka for
>> > > each transaction abort be good enough, or usable?
>> >
>> > Why should it not be usable? It's the same strategy as used in 0.10.2
>> > and it runs in production in many companies already.
>> >
>> > > however it seems to me we might have a regression of sorts
>> > > Now we might pay it for a transaction failure.
>> >
>> > I would assume transaction failures to be quite rare. Maybe the core EoS
>> > folks can comment here, too.
>> >
>> >
>> >
>> > -Matthias
>> >
>> >
>> >
>> > On 3/20/17 3:16 PM, Eno Thereska wrote:
>> > > Hi Matthias,
>> > >
>> > > I'd like to see some more info on how you propose to handle
>> transactions
>> > that involve state stores in the KIP itself. The design doc has info
>> about
>> > various optimisations like RocksDb snapshots and transactions and such,
>> but
>> > will there be a user-visible interface that indicates whether a store
>> has
>> > snapshot and/or transactional capabilities? If a user plugs in another
>> > store, what guarantees are they expected to get?
>> > >
>> > > Will a V1 design that relies on plain store recovery from Kafka for
>> each
>> > transaction abort be good enough, or usable? If your dataset is large
>> > (e.g., 200GB) the recovery time might be so large as to effectively
>> render
>> > that Kafka Streams instance unavailable for tens of minutes. You mention
>> > that is not a regression to what we currently have, however it seems to
>> me
>> > we might have a regression of sorts: currently we pay the recovery price
>> > for a Kafka Streams instance failure. Now we might pay it for a
>> transaction
>> > failure. Will transaction failures be more or less common than the
>> previous
>> > types of failures? I'd like to see this addressed.
>> > >
>> > > Thanks
>> > > Eno
>> > >
>> > >
>> > >
>> > >> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
>> > wrote:
>> > >>
>> > >> Just a quick follow up:
>> > >>
>> > >> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
>> > >> 1.0” version. The raised concerns are all valid, but hard to
>> quantify at
>> > >> the moment. Implementing KIP-129, that provides a clean design,
>> allows
>> > >> us to gain more insight in the performance implications. This enables
>> > >> us, to make an educated decision, if the “producer per task” model
>> > >> perform wells or not, and if a switch to a “producer per thread”
>> model
>> > >> is mandatory.
>> > >>
>> > >> We also want to point out, that we can move incrementally from
>> "producer
>> > >> per task" to "producer per thread" design or apply some incremental
>> > >> improvements to "producer per task" (as discussed in the doc). Thus,
>> > >> there is not issue with regard to upgrading.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >>
>> > >>
>> > >> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
>> > >>> Hi,
>> > >>>
>> > >>> I want to pick up this thread again. As there are some concerns
>> about
>> > >>> the "producer per task" design, we did write up an alternative
>> > "producer
>> > >>> per thread" design and discuss pros/cons of both approaches:
>> > >>>
>> > >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
>> > zXISV4oE0ZeMZwT_sG1QWgL4EE
>> > >>>
>> > >>>
>> > >>> Looking forward to your feedback.
>> > >>>
>> > >>>
>> > >>> -Matthias
>> > >>>
>> > >>>
>> > >>> On 3/10/17 3:24 AM, Damian Guy wrote:
>> > >>>> Hi Matthias,
>> > >>>>
>> > >>>> Thanks for the response. I agree with you regarding the use of
>> > >>>> PartitionGrouper to reduce the number of tasks. It would be good to
>> > have an
>> > >>>> idea of any additional load on the brokers as we increase the
>> number
>> > of
>> > >>>> tasks and therefore producers.
>> > >>>>
>> > >>>> Thanks,
>> > >>>> Damian
>> > >>>>
>> > >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <matthias@confluent.io
>> >
>> > wrote:
>> > >>>>
>> > >>>>> Damian, Jun,
>> > >>>>>
>> > >>>>> Thanks for your input.
>> > >>>>>
>> > >>>>>
>> > >>>>> About Performance test:
>> > >>>>>
>> > >>>>> I can follow up with more performance tests using more partitions
>> and
>> > >>>>> also collecting broker metrics.
>> > >>>>>
>> > >>>>> However, I want to highlight again, that even if 1000+ partitions
>> > would
>> > >>>>> be problematic, one can simply implement PartitionGrouper
>> interface
>> > and
>> > >>>>> reduce the number of tasks to 250 or 100... So I am not sure, if
>> we
>> > >>>>> should block this KIP, even if there might be some performance
>> > penalty
>> > >>>>> for currently single partitioned tasks.
>> > >>>>>
>> > >>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB
>> > and
>> > >>>>> 133MB for all experiments (thus I did not put it in the
>> spreadsheet).
>> > >>>>> Thus, using 100 producers (each using a max of 32MB of memory) was
>> > not
>> > >>>>> an issue with regard to memory consumption. I did not track
>> "current
>> > >>>>> head/off-heap" memory as this would require a more advance test
>> > setup to
>> > >>>>> monitor it over time. If you think this would be required, we can
>> do
>> > >>>>> some tests though.
>> > >>>>>
>> > >>>>> However, as 256 MB was enough memory, and there are other
>> components
>> > >>>>> next to the producers using memory, I don't expect a severely
>> > increased
>> > >>>>> memory usage. Producer allocate memory on-demand, and if load is
>> > shared
>> > >>>>> over multiple producers, overall memory usage should stay the same
>> > as a
>> > >>>>> single producer should allocate less memory.
>> > >>>>>
>> > >>>>>
>> > >>>>> About Batching:
>> > >>>>>
>> > >>>>> As you can see from the benchmarks (in the detailed view -- I also
>> > added
>> > >>>>> some graphs to the summary now) the average batch size gets
>> slightly
>> > >>>>> decrease with an increased number of partitions. However, there
>> is no
>> > >>>>> big difference between "producer per thread" and "producer per
>> task"
>> > >>>>> scenario.
>> > >>>>>
>> > >>>>>
>> > >>>>> About acks:
>> > >>>>>
>> > >>>>> This is covered by KIP-98 already. If idempotent producer is use,
>> > it's
>> > >>>>> required to set max.in.flight.requests.per.connection=1 and
>> retries
>> > > 0
>> > >>>>> -- otherwise a config exception will be thrown. For transactions,
>> > it's
>> > >>>>> further required that acks=-1 to avoid a config exception.
>> > >>>>>
>> > >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
>> > broker/topic
>> > >>>>> configs) are out of scope, and it's user responsibility to set
>> those
>> > >>>>> values correctly to ensure transactionality and idempotency.
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> -Matthias
>> > >>>>>
>> > >>>>>
>> > >>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
>> > >>>>>> Hi, Guozhang,
>> > >>>>>>
>> > >>>>>> Thanks for the KIP. A couple of comments.
>> > >>>>>>
>> > >>>>>> 1. About the impact on producer batching. My understanding is
>> that
>> > >>>>>> typically different sub-topologies in the same task are
>> publishing
>> > to
>> > >>>>>> different topics. Since the producer batching happens at the
>> > >>>>>> topic/partition level, using a producer per task may not impact
>> > batching
>> > >>>>>> much.
>> > >>>>>>
>> > >>>>>> 2. When processing.guarantee is set to exactly_once, do we want
>> to
>> > >>>>> enforce
>> > >>>>>> acks to all in the producer? The default acks is 1 and may cause
>> > acked
>> > >>>>> data
>> > >>>>>> to be lost later when the leader changes.
>> > >>>>>>
>> > >>>>>> Thanks,
>> > >>>>>>
>> > >>>>>> Jun
>> > >>>>>>
>> > >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <damian.guy@gmail.com
>> >
>> > wrote:
>> > >>>>>>
>> > >>>>>>> Hi Matthias,
>> > >>>>>>>
>> > >>>>>>> Thanks. The perf test is a good start but I don't think it goes
>> far
>> > >>>>> enough.
>> > >>>>>>> 100 partitions is not a lot. What happens when there are
>> thousands
>> > of
>> > >>>>>>> partitions? What is the load on the brokers? How much more
>> memory
>> > is
>> > >>>>> used
>> > >>>>>>> by the Streams App etc?
>> > >>>>>>>
>> > >>>>>>> Thanks,
>> > >>>>>>> Damian
>> > >>>>>>>
>> > >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <
>> matthias@confluent.io
>> > >
>> > >>>>> wrote:
>> > >>>>>>>
>> > >>>>>>>> Hi,
>> > >>>>>>>>
>> > >>>>>>>> I want to give a first respond:
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 1. Producer per task:
>> > >>>>>>>>
>> > >>>>>>>> First, we did some performance tests, indicating that the
>> > performance
>> > >>>>>>>> penalty is small. Please have a look here:
>> > >>>>>>>>
>> > >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>> > >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>> > >>>>>>>>
>> > >>>>>>>> For the test, we ran with a trunk version and a modified
>> version
>> > that
>> > >>>>>>>> uses a producer per task (of course, no transactions, but
>> > at-least-once
>> > >>>>>>>> semantics). The scaling factor indicates the number of brokers
>> and
>> > >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark
>> that
>> > is
>> > >>>>>>>> part of AK code base.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> Second, as the design is "producer per task" (and not "producer
>> > per
>> > >>>>>>>> partition") it is possible to specify a custom PartitionGrouper
>> > that
>> > >>>>>>>> assigns multiple partitions to a single task. Thus, it allows
>> to
>> > reduce
>> > >>>>>>>> the number of tasks for scenarios with many partitions. Right
>> > now, this
>> > >>>>>>>> interface must be implemented solely by the user, but we could
>> > also add
>> > >>>>>>>> a new config parameter that specifies the max.number.of.tasks
>> or
>> > >>>>>>>> partitions.per.task so that the user can configure this
>> instead of
>> > >>>>>>>> implementing the interface.
>> > >>>>>>>>
>> > >>>>>>>> Third, there is the idea of a "Producer Pool" that would allow
>> to
>> > share
>> > >>>>>>>> resources (network connections, memory, etc) over multiple
>> > producers.
>> > >>>>>>>> This would allow to separate multiple transaction on the
>> producer
>> > >>>>> level,
>> > >>>>>>>> while resources are shared. There is no detailed design
>> document
>> > yet
>> > >>>>> and
>> > >>>>>>>> there would be a KIP for this feature.
>> > >>>>>>>>
>> > >>>>>>>> Thus, if there should be any performance problems for high
>> scale
>> > >>>>>>>> scenarios, there are multiple ways to tackle them while keeping
>> > the
>> > >>>>>>>> "producer per task" design.
>> > >>>>>>>>
>> > >>>>>>>> Additionally, a "producer per thread" design would be way more
>> > complex
>> > >>>>>>>> and I summarized the issues in a separate document. I will
>> share
>> > a link
>> > >>>>>>>> to the document soon.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 2. StateStore recovery:
>> > >>>>>>>>
>> > >>>>>>>> Streams EoS will in the first design not allow to exploit the
>> > >>>>>>>> improvements that are added for 0.11 at the moment. However, as
>> > 0.10.2
>> > >>>>>>>> faces the same issues of potentially long recovery, there is no
>> > >>>>>>>> regression with this regard. Thus, I see those improvements as
>> > >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore
>> > those
>> > >>>>>>>> options and if possible get them into 0.11 such that Streams
>> with
>> > EoS
>> > >>>>>>>> gets the same improvements as at-least-once scenario.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 3. Caching:
>> > >>>>>>>>
>> > >>>>>>>> We might need to do some experiments to quantify the impact on
>> > caching.
>> > >>>>>>>> If it's severe, the suggested default commit interval of 100ms
>> > could
>> > >>>>>>>> also be increased. Also, EoS will not enforce any commit
>> > interval, but
>> > >>>>>>>> only change the default value. Thus, a user can freely
>> trade-off
>> > >>>>> latency
>> > >>>>>>>> vs. caching-effect.
>> > >>>>>>>>
>> > >>>>>>>> Last but not least, there is the idea to allow
>> "read_uncommitted"
>> > for
>> > >>>>>>>> intermediate topic. This would be an advance design for Streams
>> > EoS
>> > >>>>> that
>> > >>>>>>>> allows downstream sub-topologies to read uncommitted data
>> > >>>>>>>> optimistically. In case of failure, a cascading abort of
>> > transactions
>> > >>>>>>>> would be required. This change will need another KIP.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 4. Idempotent Producer:
>> > >>>>>>>>
>> > >>>>>>>> The transactional part automatically leverages the idempotent
>> > >>>>> properties
>> > >>>>>>>> of the producer. Idempotency is a requirement:
>> > >>>>>>>>
>> > >>>>>>>>> Note that enable.idempotence must be enabled if a
>> > TransactionalId is
>> > >>>>>>>> configured.
>> > >>>>>>>>
>> > >>>>>>>> See
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>> https://docs.google.com/document/d/11Jqy_
>> > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> > >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>> > >>>>>>>>
>> > >>>>>>>> All idempotent retries, are handled by the producer internally
>> > (with or
>> > >>>>>>>> without transaction) if enable.idempotence is set to true.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> -Matthias
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>> > >>>>>>>>> Another question:
>> > >>>>>>>>>
>> > >>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>> > >>>>> guarantee
>> > >>>>>>>> from KIP-98. It seems that only the transactional part is
>> needed.
>> > Or is
>> > >>>>>>> the
>> > >>>>>>>> idempotence guarantee working behind the scenes and helping for
>> > some
>> > >>>>>>>> scenarios for which it is not worthwhile aborting a transaction
>> > (e.g.,
>> > >>>>>>>> retransmitting a record after a temporary network glitch)?
>> > >>>>>>>>>
>> > >>>>>>>>> Thanks
>> > >>>>>>>>> Eno
>> > >>>>>>>>>
>> > >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io>
>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>> I second the concern on with the one producer per task
>> > approach. At a
>> > >>>>>>>>>> high-level it seems to make sense but I think Damian is
>> exactly
>> > right
>> > >>>>>>>> that
>> > >>>>>>>>>> that cuts against the general design of the producer. Many
>> > people
>> > >>>>> have
>> > >>>>>>>> high
>> > >>>>>>>>>> input partition counts and will have high task counts as a
>> > result. I
>> > >>>>>>>> think
>> > >>>>>>>>>> processing 1000 partitions should not be an unreasonable
>> thing
>> > to
>> > >>>>> want
>> > >>>>>>>> to
>> > >>>>>>>>>> do.
>> > >>>>>>>>>>
>> > >>>>>>>>>> The tricky bits will be:
>> > >>>>>>>>>>
>> > >>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
>> > memory to
>> > >>>>>>> get
>> > >>>>>>>>>>  equivalent batching). This doesn't show up in simple
>> benchmarks
>> > >>>>>>>> because
>> > >>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
>> > additional
>> > >>>>>>>> threads
>> > >>>>>>>>>>  from all the producers can make a single-threaded benchmark
>> > seem
>> > >>>>>>>> faster.
>> > >>>>>>>>>>  - TCP connection explosion. We maintain one connection per
>> > broker.
>> > >>>>>>>> This
>> > >>>>>>>>>>  is already high since each app instance does this. This
>> design
>> > >>>>>>> though
>> > >>>>>>>> will
>> > >>>>>>>>>>  add an additional multiplicative factor based on the
>> partition
>> > >>>>> count
>> > >>>>>>>> of the
>> > >>>>>>>>>>  input.
>> > >>>>>>>>>>  - Connection and metadata request storms. When an instance
>> with
>> > >>>>> 1000
>> > >>>>>>>>>>  tasks starts up it is going to try to create many thousands
>> of
>> > >>>>>>>> connections
>> > >>>>>>>>>>  and issue a thousand metadata requests all at once.
>> > >>>>>>>>>>  - Memory usage. We currently default to 64MB per producer.
>> > This can
>> > >>>>>>> be
>> > >>>>>>>>>>  tuned down, but the fact that we are spreading the batching
>> > over
>> > >>>>>>> more
>> > >>>>>>>>>>  producers will fundamentally mean we need a lot more memory
>> to
>> > get
>> > >>>>>>>> good
>> > >>>>>>>>>>  perf and the memory usage will change as your task
>> assignment
>> > >>>>>>> changes
>> > >>>>>>>> so it
>> > >>>>>>>>>>  will be hard to set correctly unless it is done
>> automatically.
>> > >>>>>>>>>>  - Metrics explosion (1000 producer instances, each with
>> their
>> > own
>> > >>>>>>>>>>  metrics to monitor).
>> > >>>>>>>>>>  - Thread explosion, 1000 background threads, one per
>> producer,
>> > each
>> > >>>>>>>>>>  sending data.
>> > >>>>>>>>>>
>> > >>>>>>>>>> -Jay
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
>> > damian.guy@gmail.com>
>> > >>>>>>>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Hi Guozhang,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka
>> > Streams
>> > >>>>>>> and
>> > >>>>>>>> will
>> > >>>>>>>>>>> help to unlock a bunch of use cases.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> I have some concerns/questions:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this
>> is
>> > going
>> > >>>>>>> to
>> > >>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can
>> > easily
>> > >>>>>>>>>>> imagine
>> > >>>>>>>>>>>  an app consuming thousands of partitions. What load will
>> this
>> > put
>> > >>>>>>> on
>> > >>>>>>>> the
>> > >>>>>>>>>>>  brokers? Am i correct in assuming that there will be
>> metadata
>> > >>>>>>>> requests
>> > >>>>>>>>>>> per
>> > >>>>>>>>>>>  Producer? The memory overhead in the streams app will also
>> > >>>>> increase
>> > >>>>>>>>>>> fairly
>> > >>>>>>>>>>>  significantly. Should we adjust
>> ProducerConfig.BUFFER_MEMORY_
>> > >>>>>>> CONFIG?
>> > >>>>>>>>>>>  2. State Store recovery: As we already know, restoring the
>> > entire
>> > >>>>>>>>>>>  changelog can take an extremely long time. Even with a
>> fairly
>> > >>>>> small
>> > >>>>>>>>>>> dataset
>> > >>>>>>>>>>>  and an inappropriately tuned segment size, this can take
>> way
>> > too
>> > >>>>>>>> long.
>> > >>>>>>>>>>> My
>> > >>>>>>>>>>>  concern is that failures happen and then recovery takes
>> > "forever"
>> > >>>>>>>> and we
>> > >>>>>>>>>>>  end up in a situation where we need to change the
>> > >>>>> max.poll.interval
>> > >>>>>>>> to
>> > >>>>>>>>>>> be
>> > >>>>>>>>>>>  some very large number or else we end up in "rebalance
>> hell".
>> > I
>> > >>>>>>> don't
>> > >>>>>>>>>>> think
>> > >>>>>>>>>>>  this provides a very good user experience. You mention
>> RocksDB
>> > >>>>>>>>>>>  checkpointing in the doc - should we explore this idea some
>> > more?
>> > >>>>>>>> i.e.,
>> > >>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint
>> > every
>> > >>>>>>> *n*
>> > >>>>>>>>>>>   commits?
>> > >>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
>> > interval
>> > >>>>> to
>> > >>>>>>>>>>>  100ms then the cache is not going to be very effective.
>> > Should it
>> > >>>>>>>> just
>> > >>>>>>>>>>> be
>> > >>>>>>>>>>>  disabled?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks,
>> > >>>>>>>>>>> Damian
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <
>> wangguoz@gmail.com
>> > >
>> > >>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> Hi all,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
>> > Streams and
>> > >>>>>>>>>>> provide
>> > >>>>>>>>>>>> exactly-once processing semantics:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> This KIP enables Streams users to optionally turn on
>> > exactly-once
>> > >>>>>>>>>>>> processing semantics without changing their app code at
>> all by
>> > >>>>>>>> leveraging
>> > >>>>>>>>>>>> the transactional messaging features provided in KIP-98.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> The above wiki page provides a high-level view of the
>> proposed
>> > >>>>>>>> changes,
>> > >>>>>>>>>>>> while detailed implementation design can be found in this
>> > Google
>> > >>>>>>> doc:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> https://docs.google.com/document/d/
>> > 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>> > >>>>>>>>>>> FK1DAB8_gBYA2c
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> We would love to hear your comments and suggestions.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>> -- Guozhang
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>
>> > >
>> >
>> >
>>
>
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Michael Noll <mi...@confluent.io>.
I second Eno's concern regarding the impact of Streams EOS on state stores.

>  We do a full recovery today and the EOS proposal will not make this any
worse.

Yes, today we do a full state store recovery under certain failures.
However, I think the point (or perhaps: open question) is that, with the
EOS design, there's now an *increased likelihood* of such failures that
trigger full state store recovery.  If this increase is significant, then I
would consider this to be a regression that we should address.

As Eno said:

> currently we pay the recovery price for a Kafka Streams instance failure.
> Now we might pay it for a transaction failure. Will transaction failures
be
> more or less common than the previous types of failures?

Damian voiced similar concerns at the very beginning of this discussion,
not sure what his current opinion is here.

-Michael





On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <ra...@confluent.io>
wrote:

> To add to this discussion, I do think we should think about this in
> increments. We do a full recovery today and the EOS proposal will not make
> this any worse. Using store snapshot is a good option to avoid store
> recovery in the future but as Eno points out, all pluggable stores would
> need to have this ability. W.r.t transaction failures, this should not be
> an issue. We should be simply retrying. There is one optimization we can do
> for clean shutdowns. We could store a clean shutdown file that contains the
> input offsets. This file gets written when you close the streams instance.
> On start, you could can check the offsets from the shutdown file and
> compare it with the offsets we get from the consumer and ensure they match.
> If they do, you could use the same store instead of recovering. However, if
> we go with the snapshot approach, this will not be required. My vote would
> be to implement V1 and solve the bootstrap problem which exist today in the
> future versions.
>
> On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Thanks for your feedback Eno.
> >
> > For now, I still think that the KIP itself does not need to talk about
> > this in more detail, because we apply the same strategy for EoS as for
> > non-EoS (as of 0.10.2).
> >
> > Thus, in case of a clean shutdown, we write the checkpoint file for a
> > store and thus know we can reuse the store. In case of failure, we need
> > to recreate the store from the changelog.
> >
> > > Will a V1 design that relies on plain store recovery from Kafka for
> > > each transaction abort be good enough, or usable?
> >
> > Why should it not be usable? It's the same strategy as used in 0.10.2
> > and it runs in production in many companies already.
> >
> > > however it seems to me we might have a regression of sorts
> > > Now we might pay it for a transaction failure.
> >
> > I would assume transaction failures to be quite rare. Maybe the core EoS
> > folks can comment here, too.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 3/20/17 3:16 PM, Eno Thereska wrote:
> > > Hi Matthias,
> > >
> > > I'd like to see some more info on how you propose to handle
> transactions
> > that involve state stores in the KIP itself. The design doc has info
> about
> > various optimisations like RocksDb snapshots and transactions and such,
> but
> > will there be a user-visible interface that indicates whether a store has
> > snapshot and/or transactional capabilities? If a user plugs in another
> > store, what guarantees are they expected to get?
> > >
> > > Will a V1 design that relies on plain store recovery from Kafka for
> each
> > transaction abort be good enough, or usable? If your dataset is large
> > (e.g., 200GB) the recovery time might be so large as to effectively
> render
> > that Kafka Streams instance unavailable for tens of minutes. You mention
> > that is not a regression to what we currently have, however it seems to
> me
> > we might have a regression of sorts: currently we pay the recovery price
> > for a Kafka Streams instance failure. Now we might pay it for a
> transaction
> > failure. Will transaction failures be more or less common than the
> previous
> > types of failures? I'd like to see this addressed.
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > >
> > >> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >>
> > >> Just a quick follow up:
> > >>
> > >> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
> > >> 1.0” version. The raised concerns are all valid, but hard to quantify
> at
> > >> the moment. Implementing KIP-129, that provides a clean design, allows
> > >> us to gain more insight in the performance implications. This enables
> > >> us, to make an educated decision, if the “producer per task” model
> > >> perform wells or not, and if a switch to a “producer per thread” model
> > >> is mandatory.
> > >>
> > >> We also want to point out, that we can move incrementally from
> "producer
> > >> per task" to "producer per thread" design or apply some incremental
> > >> improvements to "producer per task" (as discussed in the doc). Thus,
> > >> there is not issue with regard to upgrading.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> > >>> Hi,
> > >>>
> > >>> I want to pick up this thread again. As there are some concerns about
> > >>> the "producer per task" design, we did write up an alternative
> > "producer
> > >>> per thread" design and discuss pros/cons of both approaches:
> > >>>
> > >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
> > zXISV4oE0ZeMZwT_sG1QWgL4EE
> > >>>
> > >>>
> > >>> Looking forward to your feedback.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/10/17 3:24 AM, Damian Guy wrote:
> > >>>> Hi Matthias,
> > >>>>
> > >>>> Thanks for the response. I agree with you regarding the use of
> > >>>> PartitionGrouper to reduce the number of tasks. It would be good to
> > have an
> > >>>> idea of any additional load on the brokers as we increase the number
> > of
> > >>>> tasks and therefore producers.
> > >>>>
> > >>>> Thanks,
> > >>>> Damian
> > >>>>
> > >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >>>>
> > >>>>> Damian, Jun,
> > >>>>>
> > >>>>> Thanks for your input.
> > >>>>>
> > >>>>>
> > >>>>> About Performance test:
> > >>>>>
> > >>>>> I can follow up with more performance tests using more partitions
> and
> > >>>>> also collecting broker metrics.
> > >>>>>
> > >>>>> However, I want to highlight again, that even if 1000+ partitions
> > would
> > >>>>> be problematic, one can simply implement PartitionGrouper interface
> > and
> > >>>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
> > >>>>> should block this KIP, even if there might be some performance
> > penalty
> > >>>>> for currently single partitioned tasks.
> > >>>>>
> > >>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB
> > and
> > >>>>> 133MB for all experiments (thus I did not put it in the
> spreadsheet).
> > >>>>> Thus, using 100 producers (each using a max of 32MB of memory) was
> > not
> > >>>>> an issue with regard to memory consumption. I did not track
> "current
> > >>>>> head/off-heap" memory as this would require a more advance test
> > setup to
> > >>>>> monitor it over time. If you think this would be required, we can
> do
> > >>>>> some tests though.
> > >>>>>
> > >>>>> However, as 256 MB was enough memory, and there are other
> components
> > >>>>> next to the producers using memory, I don't expect a severely
> > increased
> > >>>>> memory usage. Producer allocate memory on-demand, and if load is
> > shared
> > >>>>> over multiple producers, overall memory usage should stay the same
> > as a
> > >>>>> single producer should allocate less memory.
> > >>>>>
> > >>>>>
> > >>>>> About Batching:
> > >>>>>
> > >>>>> As you can see from the benchmarks (in the detailed view -- I also
> > added
> > >>>>> some graphs to the summary now) the average batch size gets
> slightly
> > >>>>> decrease with an increased number of partitions. However, there is
> no
> > >>>>> big difference between "producer per thread" and "producer per
> task"
> > >>>>> scenario.
> > >>>>>
> > >>>>>
> > >>>>> About acks:
> > >>>>>
> > >>>>> This is covered by KIP-98 already. If idempotent producer is use,
> > it's
> > >>>>> required to set max.in.flight.requests.per.connection=1 and
> retries
> > > 0
> > >>>>> -- otherwise a config exception will be thrown. For transactions,
> > it's
> > >>>>> further required that acks=-1 to avoid a config exception.
> > >>>>>
> > >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
> > broker/topic
> > >>>>> configs) are out of scope, and it's user responsibility to set
> those
> > >>>>> values correctly to ensure transactionality and idempotency.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>>
> > >>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
> > >>>>>> Hi, Guozhang,
> > >>>>>>
> > >>>>>> Thanks for the KIP. A couple of comments.
> > >>>>>>
> > >>>>>> 1. About the impact on producer batching. My understanding is that
> > >>>>>> typically different sub-topologies in the same task are publishing
> > to
> > >>>>>> different topics. Since the producer batching happens at the
> > >>>>>> topic/partition level, using a producer per task may not impact
> > batching
> > >>>>>> much.
> > >>>>>>
> > >>>>>> 2. When processing.guarantee is set to exactly_once, do we want to
> > >>>>> enforce
> > >>>>>> acks to all in the producer? The default acks is 1 and may cause
> > acked
> > >>>>> data
> > >>>>>> to be lost later when the leader changes.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Jun
> > >>>>>>
> > >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >>>>>>
> > >>>>>>> Hi Matthias,
> > >>>>>>>
> > >>>>>>> Thanks. The perf test is a good start but I don't think it goes
> far
> > >>>>> enough.
> > >>>>>>> 100 partitions is not a lot. What happens when there are
> thousands
> > of
> > >>>>>>> partitions? What is the load on the brokers? How much more memory
> > is
> > >>>>> used
> > >>>>>>> by the Streams App etc?
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Damian
> > >>>>>>>
> > >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <
> matthias@confluent.io
> > >
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> I want to give a first respond:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 1. Producer per task:
> > >>>>>>>>
> > >>>>>>>> First, we did some performance tests, indicating that the
> > performance
> > >>>>>>>> penalty is small. Please have a look here:
> > >>>>>>>>
> > >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
> > >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> > >>>>>>>>
> > >>>>>>>> For the test, we ran with a trunk version and a modified version
> > that
> > >>>>>>>> uses a producer per task (of course, no transactions, but
> > at-least-once
> > >>>>>>>> semantics). The scaling factor indicates the number of brokers
> and
> > >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark
> that
> > is
> > >>>>>>>> part of AK code base.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Second, as the design is "producer per task" (and not "producer
> > per
> > >>>>>>>> partition") it is possible to specify a custom PartitionGrouper
> > that
> > >>>>>>>> assigns multiple partitions to a single task. Thus, it allows to
> > reduce
> > >>>>>>>> the number of tasks for scenarios with many partitions. Right
> > now, this
> > >>>>>>>> interface must be implemented solely by the user, but we could
> > also add
> > >>>>>>>> a new config parameter that specifies the max.number.of.tasks or
> > >>>>>>>> partitions.per.task so that the user can configure this instead
> of
> > >>>>>>>> implementing the interface.
> > >>>>>>>>
> > >>>>>>>> Third, there is the idea of a "Producer Pool" that would allow
> to
> > share
> > >>>>>>>> resources (network connections, memory, etc) over multiple
> > producers.
> > >>>>>>>> This would allow to separate multiple transaction on the
> producer
> > >>>>> level,
> > >>>>>>>> while resources are shared. There is no detailed design document
> > yet
> > >>>>> and
> > >>>>>>>> there would be a KIP for this feature.
> > >>>>>>>>
> > >>>>>>>> Thus, if there should be any performance problems for high scale
> > >>>>>>>> scenarios, there are multiple ways to tackle them while keeping
> > the
> > >>>>>>>> "producer per task" design.
> > >>>>>>>>
> > >>>>>>>> Additionally, a "producer per thread" design would be way more
> > complex
> > >>>>>>>> and I summarized the issues in a separate document. I will share
> > a link
> > >>>>>>>> to the document soon.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 2. StateStore recovery:
> > >>>>>>>>
> > >>>>>>>> Streams EoS will in the first design not allow to exploit the
> > >>>>>>>> improvements that are added for 0.11 at the moment. However, as
> > 0.10.2
> > >>>>>>>> faces the same issues of potentially long recovery, there is no
> > >>>>>>>> regression with this regard. Thus, I see those improvements as
> > >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore
> > those
> > >>>>>>>> options and if possible get them into 0.11 such that Streams
> with
> > EoS
> > >>>>>>>> gets the same improvements as at-least-once scenario.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 3. Caching:
> > >>>>>>>>
> > >>>>>>>> We might need to do some experiments to quantify the impact on
> > caching.
> > >>>>>>>> If it's severe, the suggested default commit interval of 100ms
> > could
> > >>>>>>>> also be increased. Also, EoS will not enforce any commit
> > interval, but
> > >>>>>>>> only change the default value. Thus, a user can freely trade-off
> > >>>>> latency
> > >>>>>>>> vs. caching-effect.
> > >>>>>>>>
> > >>>>>>>> Last but not least, there is the idea to allow
> "read_uncommitted"
> > for
> > >>>>>>>> intermediate topic. This would be an advance design for Streams
> > EoS
> > >>>>> that
> > >>>>>>>> allows downstream sub-topologies to read uncommitted data
> > >>>>>>>> optimistically. In case of failure, a cascading abort of
> > transactions
> > >>>>>>>> would be required. This change will need another KIP.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 4. Idempotent Producer:
> > >>>>>>>>
> > >>>>>>>> The transactional part automatically leverages the idempotent
> > >>>>> properties
> > >>>>>>>> of the producer. Idempotency is a requirement:
> > >>>>>>>>
> > >>>>>>>>> Note that enable.idempotence must be enabled if a
> > TransactionalId is
> > >>>>>>>> configured.
> > >>>>>>>>
> > >>>>>>>> See
> > >>>>>>>>
> > >>>>>>>>
> > >>>>> https://docs.google.com/document/d/11Jqy_
> > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> > >>>>>>>>
> > >>>>>>>> All idempotent retries, are handled by the producer internally
> > (with or
> > >>>>>>>> without transaction) if enable.idempotence is set to true.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > >>>>>>>>> Another question:
> > >>>>>>>>>
> > >>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
> > >>>>> guarantee
> > >>>>>>>> from KIP-98. It seems that only the transactional part is
> needed.
> > Or is
> > >>>>>>> the
> > >>>>>>>> idempotence guarantee working behind the scenes and helping for
> > some
> > >>>>>>>> scenarios for which it is not worthwhile aborting a transaction
> > (e.g.,
> > >>>>>>>> retransmitting a record after a temporary network glitch)?
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>> Eno
> > >>>>>>>>>
> > >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io>
> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> I second the concern on with the one producer per task
> > approach. At a
> > >>>>>>>>>> high-level it seems to make sense but I think Damian is
> exactly
> > right
> > >>>>>>>> that
> > >>>>>>>>>> that cuts against the general design of the producer. Many
> > people
> > >>>>> have
> > >>>>>>>> high
> > >>>>>>>>>> input partition counts and will have high task counts as a
> > result. I
> > >>>>>>>> think
> > >>>>>>>>>> processing 1000 partitions should not be an unreasonable thing
> > to
> > >>>>> want
> > >>>>>>>> to
> > >>>>>>>>>> do.
> > >>>>>>>>>>
> > >>>>>>>>>> The tricky bits will be:
> > >>>>>>>>>>
> > >>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
> > memory to
> > >>>>>>> get
> > >>>>>>>>>>  equivalent batching). This doesn't show up in simple
> benchmarks
> > >>>>>>>> because
> > >>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
> > additional
> > >>>>>>>> threads
> > >>>>>>>>>>  from all the producers can make a single-threaded benchmark
> > seem
> > >>>>>>>> faster.
> > >>>>>>>>>>  - TCP connection explosion. We maintain one connection per
> > broker.
> > >>>>>>>> This
> > >>>>>>>>>>  is already high since each app instance does this. This
> design
> > >>>>>>> though
> > >>>>>>>> will
> > >>>>>>>>>>  add an additional multiplicative factor based on the
> partition
> > >>>>> count
> > >>>>>>>> of the
> > >>>>>>>>>>  input.
> > >>>>>>>>>>  - Connection and metadata request storms. When an instance
> with
> > >>>>> 1000
> > >>>>>>>>>>  tasks starts up it is going to try to create many thousands
> of
> > >>>>>>>> connections
> > >>>>>>>>>>  and issue a thousand metadata requests all at once.
> > >>>>>>>>>>  - Memory usage. We currently default to 64MB per producer.
> > This can
> > >>>>>>> be
> > >>>>>>>>>>  tuned down, but the fact that we are spreading the batching
> > over
> > >>>>>>> more
> > >>>>>>>>>>  producers will fundamentally mean we need a lot more memory
> to
> > get
> > >>>>>>>> good
> > >>>>>>>>>>  perf and the memory usage will change as your task assignment
> > >>>>>>> changes
> > >>>>>>>> so it
> > >>>>>>>>>>  will be hard to set correctly unless it is done
> automatically.
> > >>>>>>>>>>  - Metrics explosion (1000 producer instances, each with their
> > own
> > >>>>>>>>>>  metrics to monitor).
> > >>>>>>>>>>  - Thread explosion, 1000 background threads, one per
> producer,
> > each
> > >>>>>>>>>>  sending data.
> > >>>>>>>>>>
> > >>>>>>>>>> -Jay
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
> > damian.guy@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Guozhang,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka
> > Streams
> > >>>>>>> and
> > >>>>>>>> will
> > >>>>>>>>>>> help to unlock a bunch of use cases.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I have some concerns/questions:
> > >>>>>>>>>>>
> > >>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this is
> > going
> > >>>>>>> to
> > >>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can
> > easily
> > >>>>>>>>>>> imagine
> > >>>>>>>>>>>  an app consuming thousands of partitions. What load will
> this
> > put
> > >>>>>>> on
> > >>>>>>>> the
> > >>>>>>>>>>>  brokers? Am i correct in assuming that there will be
> metadata
> > >>>>>>>> requests
> > >>>>>>>>>>> per
> > >>>>>>>>>>>  Producer? The memory overhead in the streams app will also
> > >>>>> increase
> > >>>>>>>>>>> fairly
> > >>>>>>>>>>>  significantly. Should we adjust
> ProducerConfig.BUFFER_MEMORY_
> > >>>>>>> CONFIG?
> > >>>>>>>>>>>  2. State Store recovery: As we already know, restoring the
> > entire
> > >>>>>>>>>>>  changelog can take an extremely long time. Even with a
> fairly
> > >>>>> small
> > >>>>>>>>>>> dataset
> > >>>>>>>>>>>  and an inappropriately tuned segment size, this can take way
> > too
> > >>>>>>>> long.
> > >>>>>>>>>>> My
> > >>>>>>>>>>>  concern is that failures happen and then recovery takes
> > "forever"
> > >>>>>>>> and we
> > >>>>>>>>>>>  end up in a situation where we need to change the
> > >>>>> max.poll.interval
> > >>>>>>>> to
> > >>>>>>>>>>> be
> > >>>>>>>>>>>  some very large number or else we end up in "rebalance
> hell".
> > I
> > >>>>>>> don't
> > >>>>>>>>>>> think
> > >>>>>>>>>>>  this provides a very good user experience. You mention
> RocksDB
> > >>>>>>>>>>>  checkpointing in the doc - should we explore this idea some
> > more?
> > >>>>>>>> i.e.,
> > >>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint
> > every
> > >>>>>>> *n*
> > >>>>>>>>>>>   commits?
> > >>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
> > interval
> > >>>>> to
> > >>>>>>>>>>>  100ms then the cache is not going to be very effective.
> > Should it
> > >>>>>>>> just
> > >>>>>>>>>>> be
> > >>>>>>>>>>>  disabled?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>> Damian
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <
> wangguoz@gmail.com
> > >
> > >>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
> > Streams and
> > >>>>>>>>>>> provide
> > >>>>>>>>>>>> exactly-once processing semantics:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> This KIP enables Streams users to optionally turn on
> > exactly-once
> > >>>>>>>>>>>> processing semantics without changing their app code at all
> by
> > >>>>>>>> leveraging
> > >>>>>>>>>>>> the transactional messaging features provided in KIP-98.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The above wiki page provides a high-level view of the
> proposed
> > >>>>>>>> changes,
> > >>>>>>>>>>>> while detailed implementation design can be found in this
> > Google
> > >>>>>>> doc:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> https://docs.google.com/document/d/
> > 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> > >>>>>>>>>>> FK1DAB8_gBYA2c
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> We would love to hear your comments and suggestions.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Sriram Subramanian <ra...@confluent.io>.
To add to this discussion, I do think we should think about this in
increments. We do a full recovery today and the EOS proposal will not make
this any worse. Using store snapshot is a good option to avoid store
recovery in the future but as Eno points out, all pluggable stores would
need to have this ability. W.r.t transaction failures, this should not be
an issue. We should be simply retrying. There is one optimization we can do
for clean shutdowns. We could store a clean shutdown file that contains the
input offsets. This file gets written when you close the streams instance.
On start, you could can check the offsets from the shutdown file and
compare it with the offsets we get from the consumer and ensure they match.
If they do, you could use the same store instead of recovering. However, if
we go with the snapshot approach, this will not be required. My vote would
be to implement V1 and solve the bootstrap problem which exist today in the
future versions.

On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for your feedback Eno.
>
> For now, I still think that the KIP itself does not need to talk about
> this in more detail, because we apply the same strategy for EoS as for
> non-EoS (as of 0.10.2).
>
> Thus, in case of a clean shutdown, we write the checkpoint file for a
> store and thus know we can reuse the store. In case of failure, we need
> to recreate the store from the changelog.
>
> > Will a V1 design that relies on plain store recovery from Kafka for
> > each transaction abort be good enough, or usable?
>
> Why should it not be usable? It's the same strategy as used in 0.10.2
> and it runs in production in many companies already.
>
> > however it seems to me we might have a regression of sorts
> > Now we might pay it for a transaction failure.
>
> I would assume transaction failures to be quite rare. Maybe the core EoS
> folks can comment here, too.
>
>
>
> -Matthias
>
>
>
> On 3/20/17 3:16 PM, Eno Thereska wrote:
> > Hi Matthias,
> >
> > I'd like to see some more info on how you propose to handle transactions
> that involve state stores in the KIP itself. The design doc has info about
> various optimisations like RocksDb snapshots and transactions and such, but
> will there be a user-visible interface that indicates whether a store has
> snapshot and/or transactional capabilities? If a user plugs in another
> store, what guarantees are they expected to get?
> >
> > Will a V1 design that relies on plain store recovery from Kafka for each
> transaction abort be good enough, or usable? If your dataset is large
> (e.g., 200GB) the recovery time might be so large as to effectively render
> that Kafka Streams instance unavailable for tens of minutes. You mention
> that is not a regression to what we currently have, however it seems to me
> we might have a regression of sorts: currently we pay the recovery price
> for a Kafka Streams instance failure. Now we might pay it for a transaction
> failure. Will transaction failures be more or less common than the previous
> types of failures? I'd like to see this addressed.
> >
> > Thanks
> > Eno
> >
> >
> >
> >> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io>
> wrote:
> >>
> >> Just a quick follow up:
> >>
> >> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
> >> 1.0” version. The raised concerns are all valid, but hard to quantify at
> >> the moment. Implementing KIP-129, that provides a clean design, allows
> >> us to gain more insight in the performance implications. This enables
> >> us, to make an educated decision, if the “producer per task” model
> >> perform wells or not, and if a switch to a “producer per thread” model
> >> is mandatory.
> >>
> >> We also want to point out, that we can move incrementally from "producer
> >> per task" to "producer per thread" design or apply some incremental
> >> improvements to "producer per task" (as discussed in the doc). Thus,
> >> there is not issue with regard to upgrading.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> >>> Hi,
> >>>
> >>> I want to pick up this thread again. As there are some concerns about
> >>> the "producer per task" design, we did write up an alternative
> "producer
> >>> per thread" design and discuss pros/cons of both approaches:
> >>>
> >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_
> zXISV4oE0ZeMZwT_sG1QWgL4EE
> >>>
> >>>
> >>> Looking forward to your feedback.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/10/17 3:24 AM, Damian Guy wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the response. I agree with you regarding the use of
> >>>> PartitionGrouper to reduce the number of tasks. It would be good to
> have an
> >>>> idea of any additional load on the brokers as we increase the number
> of
> >>>> tasks and therefore producers.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io>
> wrote:
> >>>>
> >>>>> Damian, Jun,
> >>>>>
> >>>>> Thanks for your input.
> >>>>>
> >>>>>
> >>>>> About Performance test:
> >>>>>
> >>>>> I can follow up with more performance tests using more partitions and
> >>>>> also collecting broker metrics.
> >>>>>
> >>>>> However, I want to highlight again, that even if 1000+ partitions
> would
> >>>>> be problematic, one can simply implement PartitionGrouper interface
> and
> >>>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
> >>>>> should block this KIP, even if there might be some performance
> penalty
> >>>>> for currently single partitioned tasks.
> >>>>>
> >>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB
> and
> >>>>> 133MB for all experiments (thus I did not put it in the spreadsheet).
> >>>>> Thus, using 100 producers (each using a max of 32MB of memory) was
> not
> >>>>> an issue with regard to memory consumption. I did not track "current
> >>>>> head/off-heap" memory as this would require a more advance test
> setup to
> >>>>> monitor it over time. If you think this would be required, we can do
> >>>>> some tests though.
> >>>>>
> >>>>> However, as 256 MB was enough memory, and there are other components
> >>>>> next to the producers using memory, I don't expect a severely
> increased
> >>>>> memory usage. Producer allocate memory on-demand, and if load is
> shared
> >>>>> over multiple producers, overall memory usage should stay the same
> as a
> >>>>> single producer should allocate less memory.
> >>>>>
> >>>>>
> >>>>> About Batching:
> >>>>>
> >>>>> As you can see from the benchmarks (in the detailed view -- I also
> added
> >>>>> some graphs to the summary now) the average batch size gets slightly
> >>>>> decrease with an increased number of partitions. However, there is no
> >>>>> big difference between "producer per thread" and "producer per task"
> >>>>> scenario.
> >>>>>
> >>>>>
> >>>>> About acks:
> >>>>>
> >>>>> This is covered by KIP-98 already. If idempotent producer is use,
> it's
> >>>>> required to set max.in.flight.requests.per.connection=1 and retries
> > 0
> >>>>> -- otherwise a config exception will be thrown. For transactions,
> it's
> >>>>> further required that acks=-1 to avoid a config exception.
> >>>>>
> >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all
> broker/topic
> >>>>> configs) are out of scope, and it's user responsibility to set those
> >>>>> values correctly to ensure transactionality and idempotency.
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
> >>>>>> Hi, Guozhang,
> >>>>>>
> >>>>>> Thanks for the KIP. A couple of comments.
> >>>>>>
> >>>>>> 1. About the impact on producer batching. My understanding is that
> >>>>>> typically different sub-topologies in the same task are publishing
> to
> >>>>>> different topics. Since the producer batching happens at the
> >>>>>> topic/partition level, using a producer per task may not impact
> batching
> >>>>>> much.
> >>>>>>
> >>>>>> 2. When processing.guarantee is set to exactly_once, do we want to
> >>>>> enforce
> >>>>>> acks to all in the producer? The default acks is 1 and may cause
> acked
> >>>>> data
> >>>>>> to be lost later when the leader changes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com>
> wrote:
> >>>>>>
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks. The perf test is a good start but I don't think it goes far
> >>>>> enough.
> >>>>>>> 100 partitions is not a lot. What happens when there are thousands
> of
> >>>>>>> partitions? What is the load on the brokers? How much more memory
> is
> >>>>> used
> >>>>>>> by the Streams App etc?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Damian
> >>>>>>>
> >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matthias@confluent.io
> >
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> I want to give a first respond:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 1. Producer per task:
> >>>>>>>>
> >>>>>>>> First, we did some performance tests, indicating that the
> performance
> >>>>>>>> penalty is small. Please have a look here:
> >>>>>>>>
> >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
> >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> >>>>>>>>
> >>>>>>>> For the test, we ran with a trunk version and a modified version
> that
> >>>>>>>> uses a producer per task (of course, no transactions, but
> at-least-once
> >>>>>>>> semantics). The scaling factor indicates the number of brokers and
> >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark that
> is
> >>>>>>>> part of AK code base.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Second, as the design is "producer per task" (and not "producer
> per
> >>>>>>>> partition") it is possible to specify a custom PartitionGrouper
> that
> >>>>>>>> assigns multiple partitions to a single task. Thus, it allows to
> reduce
> >>>>>>>> the number of tasks for scenarios with many partitions. Right
> now, this
> >>>>>>>> interface must be implemented solely by the user, but we could
> also add
> >>>>>>>> a new config parameter that specifies the max.number.of.tasks or
> >>>>>>>> partitions.per.task so that the user can configure this instead of
> >>>>>>>> implementing the interface.
> >>>>>>>>
> >>>>>>>> Third, there is the idea of a "Producer Pool" that would allow to
> share
> >>>>>>>> resources (network connections, memory, etc) over multiple
> producers.
> >>>>>>>> This would allow to separate multiple transaction on the producer
> >>>>> level,
> >>>>>>>> while resources are shared. There is no detailed design document
> yet
> >>>>> and
> >>>>>>>> there would be a KIP for this feature.
> >>>>>>>>
> >>>>>>>> Thus, if there should be any performance problems for high scale
> >>>>>>>> scenarios, there are multiple ways to tackle them while keeping
> the
> >>>>>>>> "producer per task" design.
> >>>>>>>>
> >>>>>>>> Additionally, a "producer per thread" design would be way more
> complex
> >>>>>>>> and I summarized the issues in a separate document. I will share
> a link
> >>>>>>>> to the document soon.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 2. StateStore recovery:
> >>>>>>>>
> >>>>>>>> Streams EoS will in the first design not allow to exploit the
> >>>>>>>> improvements that are added for 0.11 at the moment. However, as
> 0.10.2
> >>>>>>>> faces the same issues of potentially long recovery, there is no
> >>>>>>>> regression with this regard. Thus, I see those improvements as
> >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore
> those
> >>>>>>>> options and if possible get them into 0.11 such that Streams with
> EoS
> >>>>>>>> gets the same improvements as at-least-once scenario.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 3. Caching:
> >>>>>>>>
> >>>>>>>> We might need to do some experiments to quantify the impact on
> caching.
> >>>>>>>> If it's severe, the suggested default commit interval of 100ms
> could
> >>>>>>>> also be increased. Also, EoS will not enforce any commit
> interval, but
> >>>>>>>> only change the default value. Thus, a user can freely trade-off
> >>>>> latency
> >>>>>>>> vs. caching-effect.
> >>>>>>>>
> >>>>>>>> Last but not least, there is the idea to allow "read_uncommitted"
> for
> >>>>>>>> intermediate topic. This would be an advance design for Streams
> EoS
> >>>>> that
> >>>>>>>> allows downstream sub-topologies to read uncommitted data
> >>>>>>>> optimistically. In case of failure, a cascading abort of
> transactions
> >>>>>>>> would be required. This change will need another KIP.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 4. Idempotent Producer:
> >>>>>>>>
> >>>>>>>> The transactional part automatically leverages the idempotent
> >>>>> properties
> >>>>>>>> of the producer. Idempotency is a requirement:
> >>>>>>>>
> >>>>>>>>> Note that enable.idempotence must be enabled if a
> TransactionalId is
> >>>>>>>> configured.
> >>>>>>>>
> >>>>>>>> See
> >>>>>>>>
> >>>>>>>>
> >>>>> https://docs.google.com/document/d/11Jqy_
> GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> >>>>>>>>
> >>>>>>>> All idempotent retries, are handled by the producer internally
> (with or
> >>>>>>>> without transaction) if enable.idempotence is set to true.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
> >>>>>>>>> Another question:
> >>>>>>>>>
> >>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
> >>>>> guarantee
> >>>>>>>> from KIP-98. It seems that only the transactional part is needed.
> Or is
> >>>>>>> the
> >>>>>>>> idempotence guarantee working behind the scenes and helping for
> some
> >>>>>>>> scenarios for which it is not worthwhile aborting a transaction
> (e.g.,
> >>>>>>>> retransmitting a record after a temporary network glitch)?
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Eno
> >>>>>>>>>
> >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I second the concern on with the one producer per task
> approach. At a
> >>>>>>>>>> high-level it seems to make sense but I think Damian is exactly
> right
> >>>>>>>> that
> >>>>>>>>>> that cuts against the general design of the producer. Many
> people
> >>>>> have
> >>>>>>>> high
> >>>>>>>>>> input partition counts and will have high task counts as a
> result. I
> >>>>>>>> think
> >>>>>>>>>> processing 1000 partitions should not be an unreasonable thing
> to
> >>>>> want
> >>>>>>>> to
> >>>>>>>>>> do.
> >>>>>>>>>>
> >>>>>>>>>> The tricky bits will be:
> >>>>>>>>>>
> >>>>>>>>>>  - Reduced effectiveness of batching (or more latency and
> memory to
> >>>>>>> get
> >>>>>>>>>>  equivalent batching). This doesn't show up in simple benchmarks
> >>>>>>>> because
> >>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the
> additional
> >>>>>>>> threads
> >>>>>>>>>>  from all the producers can make a single-threaded benchmark
> seem
> >>>>>>>> faster.
> >>>>>>>>>>  - TCP connection explosion. We maintain one connection per
> broker.
> >>>>>>>> This
> >>>>>>>>>>  is already high since each app instance does this. This design
> >>>>>>> though
> >>>>>>>> will
> >>>>>>>>>>  add an additional multiplicative factor based on the partition
> >>>>> count
> >>>>>>>> of the
> >>>>>>>>>>  input.
> >>>>>>>>>>  - Connection and metadata request storms. When an instance with
> >>>>> 1000
> >>>>>>>>>>  tasks starts up it is going to try to create many thousands of
> >>>>>>>> connections
> >>>>>>>>>>  and issue a thousand metadata requests all at once.
> >>>>>>>>>>  - Memory usage. We currently default to 64MB per producer.
> This can
> >>>>>>> be
> >>>>>>>>>>  tuned down, but the fact that we are spreading the batching
> over
> >>>>>>> more
> >>>>>>>>>>  producers will fundamentally mean we need a lot more memory to
> get
> >>>>>>>> good
> >>>>>>>>>>  perf and the memory usage will change as your task assignment
> >>>>>>> changes
> >>>>>>>> so it
> >>>>>>>>>>  will be hard to set correctly unless it is done automatically.
> >>>>>>>>>>  - Metrics explosion (1000 producer instances, each with their
> own
> >>>>>>>>>>  metrics to monitor).
> >>>>>>>>>>  - Thread explosion, 1000 background threads, one per producer,
> each
> >>>>>>>>>>  sending data.
> >>>>>>>>>>
> >>>>>>>>>> -Jay
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <
> damian.guy@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka
> Streams
> >>>>>>> and
> >>>>>>>> will
> >>>>>>>>>>> help to unlock a bunch of use cases.
> >>>>>>>>>>>
> >>>>>>>>>>> I have some concerns/questions:
> >>>>>>>>>>>
> >>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this is
> going
> >>>>>>> to
> >>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can
> easily
> >>>>>>>>>>> imagine
> >>>>>>>>>>>  an app consuming thousands of partitions. What load will this
> put
> >>>>>>> on
> >>>>>>>> the
> >>>>>>>>>>>  brokers? Am i correct in assuming that there will be metadata
> >>>>>>>> requests
> >>>>>>>>>>> per
> >>>>>>>>>>>  Producer? The memory overhead in the streams app will also
> >>>>> increase
> >>>>>>>>>>> fairly
> >>>>>>>>>>>  significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
> >>>>>>> CONFIG?
> >>>>>>>>>>>  2. State Store recovery: As we already know, restoring the
> entire
> >>>>>>>>>>>  changelog can take an extremely long time. Even with a fairly
> >>>>> small
> >>>>>>>>>>> dataset
> >>>>>>>>>>>  and an inappropriately tuned segment size, this can take way
> too
> >>>>>>>> long.
> >>>>>>>>>>> My
> >>>>>>>>>>>  concern is that failures happen and then recovery takes
> "forever"
> >>>>>>>> and we
> >>>>>>>>>>>  end up in a situation where we need to change the
> >>>>> max.poll.interval
> >>>>>>>> to
> >>>>>>>>>>> be
> >>>>>>>>>>>  some very large number or else we end up in "rebalance hell".
> I
> >>>>>>> don't
> >>>>>>>>>>> think
> >>>>>>>>>>>  this provides a very good user experience. You mention RocksDB
> >>>>>>>>>>>  checkpointing in the doc - should we explore this idea some
> more?
> >>>>>>>> i.e.,
> >>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint
> every
> >>>>>>> *n*
> >>>>>>>>>>>   commits?
> >>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit
> interval
> >>>>> to
> >>>>>>>>>>>  100ms then the cache is not going to be very effective.
> Should it
> >>>>>>>> just
> >>>>>>>>>>> be
> >>>>>>>>>>>  disabled?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Damian
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangguoz@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka
> Streams and
> >>>>>>>>>>> provide
> >>>>>>>>>>>> exactly-once processing semantics:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
> >>>>>>>>>>>>
> >>>>>>>>>>>> This KIP enables Streams users to optionally turn on
> exactly-once
> >>>>>>>>>>>> processing semantics without changing their app code at all by
> >>>>>>>> leveraging
> >>>>>>>>>>>> the transactional messaging features provided in KIP-98.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The above wiki page provides a high-level view of the proposed
> >>>>>>>> changes,
> >>>>>>>>>>>> while detailed implementation design can be found in this
> Google
> >>>>>>> doc:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> https://docs.google.com/document/d/
> 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> >>>>>>>>>>> FK1DAB8_gBYA2c
> >>>>>>>>>>>>
> >>>>>>>>>>>> We would love to hear your comments and suggestions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for your feedback Eno.

For now, I still think that the KIP itself does not need to talk about
this in more detail, because we apply the same strategy for EoS as for
non-EoS (as of 0.10.2).

Thus, in case of a clean shutdown, we write the checkpoint file for a
store and thus know we can reuse the store. In case of failure, we need
to recreate the store from the changelog.

> Will a V1 design that relies on plain store recovery from Kafka for
> each transaction abort be good enough, or usable?

Why should it not be usable? It's the same strategy as used in 0.10.2
and it runs in production in many companies already.

> however it seems to me we might have a regression of sorts
> Now we might pay it for a transaction failure.

I would assume transaction failures to be quite rare. Maybe the core EoS
folks can comment here, too.



-Matthias



On 3/20/17 3:16 PM, Eno Thereska wrote:
> Hi Matthias,
> 
> I'd like to see some more info on how you propose to handle transactions that involve state stores in the KIP itself. The design doc has info about various optimisations like RocksDb snapshots and transactions and such, but will there be a user-visible interface that indicates whether a store has snapshot and/or transactional capabilities? If a user plugs in another store, what guarantees are they expected to get? 
> 
> Will a V1 design that relies on plain store recovery from Kafka for each transaction abort be good enough, or usable? If your dataset is large (e.g., 200GB) the recovery time might be so large as to effectively render that Kafka Streams instance unavailable for tens of minutes. You mention that is not a regression to what we currently have, however it seems to me we might have a regression of sorts: currently we pay the recovery price for a Kafka Streams instance failure. Now we might pay it for a transaction failure. Will transaction failures be more or less common than the previous types of failures? I'd like to see this addressed.
> 
> Thanks
> Eno
> 
> 
> 
>> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> Just a quick follow up:
>>
>> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
>> 1.0” version. The raised concerns are all valid, but hard to quantify at
>> the moment. Implementing KIP-129, that provides a clean design, allows
>> us to gain more insight in the performance implications. This enables
>> us, to make an educated decision, if the “producer per task” model
>> perform wells or not, and if a switch to a “producer per thread” model
>> is mandatory.
>>
>> We also want to point out, that we can move incrementally from "producer
>> per task" to "producer per thread" design or apply some incremental
>> improvements to "producer per task" (as discussed in the doc). Thus,
>> there is not issue with regard to upgrading.
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
>>> Hi,
>>>
>>> I want to pick up this thread again. As there are some concerns about
>>> the "producer per task" design, we did write up an alternative "producer
>>> per thread" design and discuss pros/cons of both approaches:
>>>
>>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE
>>>
>>>
>>> Looking forward to your feedback.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/10/17 3:24 AM, Damian Guy wrote:
>>>> Hi Matthias,
>>>>
>>>> Thanks for the response. I agree with you regarding the use of
>>>> PartitionGrouper to reduce the number of tasks. It would be good to have an
>>>> idea of any additional load on the brokers as we increase the number of
>>>> tasks and therefore producers.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io> wrote:
>>>>
>>>>> Damian, Jun,
>>>>>
>>>>> Thanks for your input.
>>>>>
>>>>>
>>>>> About Performance test:
>>>>>
>>>>> I can follow up with more performance tests using more partitions and
>>>>> also collecting broker metrics.
>>>>>
>>>>> However, I want to highlight again, that even if 1000+ partitions would
>>>>> be problematic, one can simply implement PartitionGrouper interface and
>>>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>>>>> should block this KIP, even if there might be some performance penalty
>>>>> for currently single partitioned tasks.
>>>>>
>>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>>>>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>>>>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>>>>> an issue with regard to memory consumption. I did not track "current
>>>>> head/off-heap" memory as this would require a more advance test setup to
>>>>> monitor it over time. If you think this would be required, we can do
>>>>> some tests though.
>>>>>
>>>>> However, as 256 MB was enough memory, and there are other components
>>>>> next to the producers using memory, I don't expect a severely increased
>>>>> memory usage. Producer allocate memory on-demand, and if load is shared
>>>>> over multiple producers, overall memory usage should stay the same as a
>>>>> single producer should allocate less memory.
>>>>>
>>>>>
>>>>> About Batching:
>>>>>
>>>>> As you can see from the benchmarks (in the detailed view -- I also added
>>>>> some graphs to the summary now) the average batch size gets slightly
>>>>> decrease with an increased number of partitions. However, there is no
>>>>> big difference between "producer per thread" and "producer per task"
>>>>> scenario.
>>>>>
>>>>>
>>>>> About acks:
>>>>>
>>>>> This is covered by KIP-98 already. If idempotent producer is use, it's
>>>>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>>>>> -- otherwise a config exception will be thrown. For transactions, it's
>>>>> further required that acks=-1 to avoid a config exception.
>>>>>
>>>>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>>>>> configs) are out of scope, and it's user responsibility to set those
>>>>> values correctly to ensure transactionality and idempotency.
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>>>>> Hi, Guozhang,
>>>>>>
>>>>>> Thanks for the KIP. A couple of comments.
>>>>>>
>>>>>> 1. About the impact on producer batching. My understanding is that
>>>>>> typically different sub-topologies in the same task are publishing to
>>>>>> different topics. Since the producer batching happens at the
>>>>>> topic/partition level, using a producer per task may not impact batching
>>>>>> much.
>>>>>>
>>>>>> 2. When processing.guarantee is set to exactly_once, do we want to
>>>>> enforce
>>>>>> acks to all in the producer? The default acks is 1 and may cause acked
>>>>> data
>>>>>> to be lost later when the leader changes.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>> Thanks. The perf test is a good start but I don't think it goes far
>>>>> enough.
>>>>>>> 100 partitions is not a lot. What happens when there are thousands of
>>>>>>> partitions? What is the load on the brokers? How much more memory is
>>>>> used
>>>>>>> by the Streams App etc?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Damian
>>>>>>>
>>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I want to give a first respond:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 1. Producer per task:
>>>>>>>>
>>>>>>>> First, we did some performance tests, indicating that the performance
>>>>>>>> penalty is small. Please have a look here:
>>>>>>>>
>>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>>>>
>>>>>>>> For the test, we ran with a trunk version and a modified version that
>>>>>>>> uses a producer per task (of course, no transactions, but at-least-once
>>>>>>>> semantics). The scaling factor indicates the number of brokers and
>>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>>>>>>> part of AK code base.
>>>>>>>>
>>>>>>>>
>>>>>>>> Second, as the design is "producer per task" (and not "producer per
>>>>>>>> partition") it is possible to specify a custom PartitionGrouper that
>>>>>>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>>>>>>> the number of tasks for scenarios with many partitions. Right now, this
>>>>>>>> interface must be implemented solely by the user, but we could also add
>>>>>>>> a new config parameter that specifies the max.number.of.tasks or
>>>>>>>> partitions.per.task so that the user can configure this instead of
>>>>>>>> implementing the interface.
>>>>>>>>
>>>>>>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>>>>>>> resources (network connections, memory, etc) over multiple producers.
>>>>>>>> This would allow to separate multiple transaction on the producer
>>>>> level,
>>>>>>>> while resources are shared. There is no detailed design document yet
>>>>> and
>>>>>>>> there would be a KIP for this feature.
>>>>>>>>
>>>>>>>> Thus, if there should be any performance problems for high scale
>>>>>>>> scenarios, there are multiple ways to tackle them while keeping the
>>>>>>>> "producer per task" design.
>>>>>>>>
>>>>>>>> Additionally, a "producer per thread" design would be way more complex
>>>>>>>> and I summarized the issues in a separate document. I will share a link
>>>>>>>> to the document soon.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. StateStore recovery:
>>>>>>>>
>>>>>>>> Streams EoS will in the first design not allow to exploit the
>>>>>>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>>>>>>> faces the same issues of potentially long recovery, there is no
>>>>>>>> regression with this regard. Thus, I see those improvements as
>>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>>>>>>> options and if possible get them into 0.11 such that Streams with EoS
>>>>>>>> gets the same improvements as at-least-once scenario.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 3. Caching:
>>>>>>>>
>>>>>>>> We might need to do some experiments to quantify the impact on caching.
>>>>>>>> If it's severe, the suggested default commit interval of 100ms could
>>>>>>>> also be increased. Also, EoS will not enforce any commit interval, but
>>>>>>>> only change the default value. Thus, a user can freely trade-off
>>>>> latency
>>>>>>>> vs. caching-effect.
>>>>>>>>
>>>>>>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>>>>>>> intermediate topic. This would be an advance design for Streams EoS
>>>>> that
>>>>>>>> allows downstream sub-topologies to read uncommitted data
>>>>>>>> optimistically. In case of failure, a cascading abort of transactions
>>>>>>>> would be required. This change will need another KIP.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 4. Idempotent Producer:
>>>>>>>>
>>>>>>>> The transactional part automatically leverages the idempotent
>>>>> properties
>>>>>>>> of the producer. Idempotency is a requirement:
>>>>>>>>
>>>>>>>>> Note that enable.idempotence must be enabled if a TransactionalId is
>>>>>>>> configured.
>>>>>>>>
>>>>>>>> See
>>>>>>>>
>>>>>>>>
>>>>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>>>>
>>>>>>>> All idempotent retries, are handled by the producer internally (with or
>>>>>>>> without transaction) if enable.idempotence is set to true.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>>>>> Another question:
>>>>>>>>>
>>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>>>>> guarantee
>>>>>>>> from KIP-98. It seems that only the transactional part is needed. Or is
>>>>>>> the
>>>>>>>> idempotence guarantee working behind the scenes and helping for some
>>>>>>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>>>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>>
>>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>>>>>>>>>
>>>>>>>>>> I second the concern on with the one producer per task approach. At a
>>>>>>>>>> high-level it seems to make sense but I think Damian is exactly right
>>>>>>>> that
>>>>>>>>>> that cuts against the general design of the producer. Many people
>>>>> have
>>>>>>>> high
>>>>>>>>>> input partition counts and will have high task counts as a result. I
>>>>>>>> think
>>>>>>>>>> processing 1000 partitions should not be an unreasonable thing to
>>>>> want
>>>>>>>> to
>>>>>>>>>> do.
>>>>>>>>>>
>>>>>>>>>> The tricky bits will be:
>>>>>>>>>>
>>>>>>>>>>  - Reduced effectiveness of batching (or more latency and memory to
>>>>>>> get
>>>>>>>>>>  equivalent batching). This doesn't show up in simple benchmarks
>>>>>>>> because
>>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the additional
>>>>>>>> threads
>>>>>>>>>>  from all the producers can make a single-threaded benchmark seem
>>>>>>>> faster.
>>>>>>>>>>  - TCP connection explosion. We maintain one connection per broker.
>>>>>>>> This
>>>>>>>>>>  is already high since each app instance does this. This design
>>>>>>> though
>>>>>>>> will
>>>>>>>>>>  add an additional multiplicative factor based on the partition
>>>>> count
>>>>>>>> of the
>>>>>>>>>>  input.
>>>>>>>>>>  - Connection and metadata request storms. When an instance with
>>>>> 1000
>>>>>>>>>>  tasks starts up it is going to try to create many thousands of
>>>>>>>> connections
>>>>>>>>>>  and issue a thousand metadata requests all at once.
>>>>>>>>>>  - Memory usage. We currently default to 64MB per producer. This can
>>>>>>> be
>>>>>>>>>>  tuned down, but the fact that we are spreading the batching over
>>>>>>> more
>>>>>>>>>>  producers will fundamentally mean we need a lot more memory to get
>>>>>>>> good
>>>>>>>>>>  perf and the memory usage will change as your task assignment
>>>>>>> changes
>>>>>>>> so it
>>>>>>>>>>  will be hard to set correctly unless it is done automatically.
>>>>>>>>>>  - Metrics explosion (1000 producer instances, each with their own
>>>>>>>>>>  metrics to monitor).
>>>>>>>>>>  - Thread explosion, 1000 background threads, one per producer, each
>>>>>>>>>>  sending data.
>>>>>>>>>>
>>>>>>>>>> -Jay
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
>>>>>>> and
>>>>>>>> will
>>>>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>>>>
>>>>>>>>>>> I have some concerns/questions:
>>>>>>>>>>>
>>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this is going
>>>>>>> to
>>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can easily
>>>>>>>>>>> imagine
>>>>>>>>>>>  an app consuming thousands of partitions. What load will this put
>>>>>>> on
>>>>>>>> the
>>>>>>>>>>>  brokers? Am i correct in assuming that there will be metadata
>>>>>>>> requests
>>>>>>>>>>> per
>>>>>>>>>>>  Producer? The memory overhead in the streams app will also
>>>>> increase
>>>>>>>>>>> fairly
>>>>>>>>>>>  significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>>>>>>> CONFIG?
>>>>>>>>>>>  2. State Store recovery: As we already know, restoring the entire
>>>>>>>>>>>  changelog can take an extremely long time. Even with a fairly
>>>>> small
>>>>>>>>>>> dataset
>>>>>>>>>>>  and an inappropriately tuned segment size, this can take way too
>>>>>>>> long.
>>>>>>>>>>> My
>>>>>>>>>>>  concern is that failures happen and then recovery takes "forever"
>>>>>>>> and we
>>>>>>>>>>>  end up in a situation where we need to change the
>>>>> max.poll.interval
>>>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>>  some very large number or else we end up in "rebalance hell". I
>>>>>>> don't
>>>>>>>>>>> think
>>>>>>>>>>>  this provides a very good user experience. You mention RocksDB
>>>>>>>>>>>  checkpointing in the doc - should we explore this idea some more?
>>>>>>>> i.e.,
>>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint every
>>>>>>> *n*
>>>>>>>>>>>   commits?
>>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit interval
>>>>> to
>>>>>>>>>>>  100ms then the cache is not going to be very effective. Should it
>>>>>>>> just
>>>>>>>>>>> be
>>>>>>>>>>>  disabled?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Damian
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>>>>>>>>>> provide
>>>>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>>>>
>>>>>>>>>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>>>>>>>>>> processing semantics without changing their app code at all by
>>>>>>>> leveraging
>>>>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>>>>
>>>>>>>>>>>> The above wiki page provides a high-level view of the proposed
>>>>>>>> changes,
>>>>>>>>>>>> while detailed implementation design can be found in this Google
>>>>>>> doc:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>>>>
>>>>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Eno Thereska <en...@gmail.com>.
Hi Matthias,

I'd like to see some more info on how you propose to handle transactions that involve state stores in the KIP itself. The design doc has info about various optimisations like RocksDb snapshots and transactions and such, but will there be a user-visible interface that indicates whether a store has snapshot and/or transactional capabilities? If a user plugs in another store, what guarantees are they expected to get? 

Will a V1 design that relies on plain store recovery from Kafka for each transaction abort be good enough, or usable? If your dataset is large (e.g., 200GB) the recovery time might be so large as to effectively render that Kafka Streams instance unavailable for tens of minutes. You mention that is not a regression to what we currently have, however it seems to me we might have a regression of sorts: currently we pay the recovery price for a Kafka Streams instance failure. Now we might pay it for a transaction failure. Will transaction failures be more or less common than the previous types of failures? I'd like to see this addressed.

Thanks
Eno



> On 15 Mar 2017, at 22:09, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> Just a quick follow up:
> 
> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
> 1.0” version. The raised concerns are all valid, but hard to quantify at
> the moment. Implementing KIP-129, that provides a clean design, allows
> us to gain more insight in the performance implications. This enables
> us, to make an educated decision, if the “producer per task” model
> perform wells or not, and if a switch to a “producer per thread” model
> is mandatory.
> 
> We also want to point out, that we can move incrementally from "producer
> per task" to "producer per thread" design or apply some incremental
> improvements to "producer per task" (as discussed in the doc). Thus,
> there is not issue with regard to upgrading.
> 
> 
> -Matthias
> 
> 
> 
> On 3/15/17 2:36 PM, Matthias J. Sax wrote:
>> Hi,
>> 
>> I want to pick up this thread again. As there are some concerns about
>> the "producer per task" design, we did write up an alternative "producer
>> per thread" design and discuss pros/cons of both approaches:
>> 
>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE
>> 
>> 
>> Looking forward to your feedback.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 3/10/17 3:24 AM, Damian Guy wrote:
>>> Hi Matthias,
>>> 
>>> Thanks for the response. I agree with you regarding the use of
>>> PartitionGrouper to reduce the number of tasks. It would be good to have an
>>> idea of any additional load on the brokers as we increase the number of
>>> tasks and therefore producers.
>>> 
>>> Thanks,
>>> Damian
>>> 
>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io> wrote:
>>> 
>>>> Damian, Jun,
>>>> 
>>>> Thanks for your input.
>>>> 
>>>> 
>>>> About Performance test:
>>>> 
>>>> I can follow up with more performance tests using more partitions and
>>>> also collecting broker metrics.
>>>> 
>>>> However, I want to highlight again, that even if 1000+ partitions would
>>>> be problematic, one can simply implement PartitionGrouper interface and
>>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>>>> should block this KIP, even if there might be some performance penalty
>>>> for currently single partitioned tasks.
>>>> 
>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>>>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>>>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>>>> an issue with regard to memory consumption. I did not track "current
>>>> head/off-heap" memory as this would require a more advance test setup to
>>>> monitor it over time. If you think this would be required, we can do
>>>> some tests though.
>>>> 
>>>> However, as 256 MB was enough memory, and there are other components
>>>> next to the producers using memory, I don't expect a severely increased
>>>> memory usage. Producer allocate memory on-demand, and if load is shared
>>>> over multiple producers, overall memory usage should stay the same as a
>>>> single producer should allocate less memory.
>>>> 
>>>> 
>>>> About Batching:
>>>> 
>>>> As you can see from the benchmarks (in the detailed view -- I also added
>>>> some graphs to the summary now) the average batch size gets slightly
>>>> decrease with an increased number of partitions. However, there is no
>>>> big difference between "producer per thread" and "producer per task"
>>>> scenario.
>>>> 
>>>> 
>>>> About acks:
>>>> 
>>>> This is covered by KIP-98 already. If idempotent producer is use, it's
>>>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>>>> -- otherwise a config exception will be thrown. For transactions, it's
>>>> further required that acks=-1 to avoid a config exception.
>>>> 
>>>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>>>> configs) are out of scope, and it's user responsibility to set those
>>>> values correctly to ensure transactionality and idempotency.
>>>> 
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>>>> Hi, Guozhang,
>>>>> 
>>>>> Thanks for the KIP. A couple of comments.
>>>>> 
>>>>> 1. About the impact on producer batching. My understanding is that
>>>>> typically different sub-topologies in the same task are publishing to
>>>>> different topics. Since the producer batching happens at the
>>>>> topic/partition level, using a producer per task may not impact batching
>>>>> much.
>>>>> 
>>>>> 2. When processing.guarantee is set to exactly_once, do we want to
>>>> enforce
>>>>> acks to all in the producer? The default acks is 1 and may cause acked
>>>> data
>>>>> to be lost later when the leader changes.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Matthias,
>>>>>> 
>>>>>> Thanks. The perf test is a good start but I don't think it goes far
>>>> enough.
>>>>>> 100 partitions is not a lot. What happens when there are thousands of
>>>>>> partitions? What is the load on the brokers? How much more memory is
>>>> used
>>>>>> by the Streams App etc?
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>>> 
>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I want to give a first respond:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 1. Producer per task:
>>>>>>> 
>>>>>>> First, we did some performance tests, indicating that the performance
>>>>>>> penalty is small. Please have a look here:
>>>>>>> 
>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>>> 
>>>>>>> For the test, we ran with a trunk version and a modified version that
>>>>>>> uses a producer per task (of course, no transactions, but at-least-once
>>>>>>> semantics). The scaling factor indicates the number of brokers and
>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>>>>>> part of AK code base.
>>>>>>> 
>>>>>>> 
>>>>>>> Second, as the design is "producer per task" (and not "producer per
>>>>>>> partition") it is possible to specify a custom PartitionGrouper that
>>>>>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>>>>>> the number of tasks for scenarios with many partitions. Right now, this
>>>>>>> interface must be implemented solely by the user, but we could also add
>>>>>>> a new config parameter that specifies the max.number.of.tasks or
>>>>>>> partitions.per.task so that the user can configure this instead of
>>>>>>> implementing the interface.
>>>>>>> 
>>>>>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>>>>>> resources (network connections, memory, etc) over multiple producers.
>>>>>>> This would allow to separate multiple transaction on the producer
>>>> level,
>>>>>>> while resources are shared. There is no detailed design document yet
>>>> and
>>>>>>> there would be a KIP for this feature.
>>>>>>> 
>>>>>>> Thus, if there should be any performance problems for high scale
>>>>>>> scenarios, there are multiple ways to tackle them while keeping the
>>>>>>> "producer per task" design.
>>>>>>> 
>>>>>>> Additionally, a "producer per thread" design would be way more complex
>>>>>>> and I summarized the issues in a separate document. I will share a link
>>>>>>> to the document soon.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2. StateStore recovery:
>>>>>>> 
>>>>>>> Streams EoS will in the first design not allow to exploit the
>>>>>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>>>>>> faces the same issues of potentially long recovery, there is no
>>>>>>> regression with this regard. Thus, I see those improvements as
>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>>>>>> options and if possible get them into 0.11 such that Streams with EoS
>>>>>>> gets the same improvements as at-least-once scenario.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 3. Caching:
>>>>>>> 
>>>>>>> We might need to do some experiments to quantify the impact on caching.
>>>>>>> If it's severe, the suggested default commit interval of 100ms could
>>>>>>> also be increased. Also, EoS will not enforce any commit interval, but
>>>>>>> only change the default value. Thus, a user can freely trade-off
>>>> latency
>>>>>>> vs. caching-effect.
>>>>>>> 
>>>>>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>>>>>> intermediate topic. This would be an advance design for Streams EoS
>>>> that
>>>>>>> allows downstream sub-topologies to read uncommitted data
>>>>>>> optimistically. In case of failure, a cascading abort of transactions
>>>>>>> would be required. This change will need another KIP.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 4. Idempotent Producer:
>>>>>>> 
>>>>>>> The transactional part automatically leverages the idempotent
>>>> properties
>>>>>>> of the producer. Idempotency is a requirement:
>>>>>>> 
>>>>>>>> Note that enable.idempotence must be enabled if a TransactionalId is
>>>>>>> configured.
>>>>>>> 
>>>>>>> See
>>>>>>> 
>>>>>>> 
>>>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>>> 
>>>>>>> All idempotent retries, are handled by the producer internally (with or
>>>>>>> without transaction) if enable.idempotence is set to true.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>>>> Another question:
>>>>>>>> 
>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>>>> guarantee
>>>>>>> from KIP-98. It seems that only the transactional part is needed. Or is
>>>>>> the
>>>>>>> idempotence guarantee working behind the scenes and helping for some
>>>>>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>>> 
>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>>>>>>>> 
>>>>>>>>> I second the concern on with the one producer per task approach. At a
>>>>>>>>> high-level it seems to make sense but I think Damian is exactly right
>>>>>>> that
>>>>>>>>> that cuts against the general design of the producer. Many people
>>>> have
>>>>>>> high
>>>>>>>>> input partition counts and will have high task counts as a result. I
>>>>>>> think
>>>>>>>>> processing 1000 partitions should not be an unreasonable thing to
>>>> want
>>>>>>> to
>>>>>>>>> do.
>>>>>>>>> 
>>>>>>>>> The tricky bits will be:
>>>>>>>>> 
>>>>>>>>>  - Reduced effectiveness of batching (or more latency and memory to
>>>>>> get
>>>>>>>>>  equivalent batching). This doesn't show up in simple benchmarks
>>>>>>> because
>>>>>>>>>  much of the penalty is I/O and CPU on the broker and the additional
>>>>>>> threads
>>>>>>>>>  from all the producers can make a single-threaded benchmark seem
>>>>>>> faster.
>>>>>>>>>  - TCP connection explosion. We maintain one connection per broker.
>>>>>>> This
>>>>>>>>>  is already high since each app instance does this. This design
>>>>>> though
>>>>>>> will
>>>>>>>>>  add an additional multiplicative factor based on the partition
>>>> count
>>>>>>> of the
>>>>>>>>>  input.
>>>>>>>>>  - Connection and metadata request storms. When an instance with
>>>> 1000
>>>>>>>>>  tasks starts up it is going to try to create many thousands of
>>>>>>> connections
>>>>>>>>>  and issue a thousand metadata requests all at once.
>>>>>>>>>  - Memory usage. We currently default to 64MB per producer. This can
>>>>>> be
>>>>>>>>>  tuned down, but the fact that we are spreading the batching over
>>>>>> more
>>>>>>>>>  producers will fundamentally mean we need a lot more memory to get
>>>>>>> good
>>>>>>>>>  perf and the memory usage will change as your task assignment
>>>>>> changes
>>>>>>> so it
>>>>>>>>>  will be hard to set correctly unless it is done automatically.
>>>>>>>>>  - Metrics explosion (1000 producer instances, each with their own
>>>>>>>>>  metrics to monitor).
>>>>>>>>>  - Thread explosion, 1000 background threads, one per producer, each
>>>>>>>>>  sending data.
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
>>>>>> and
>>>>>>> will
>>>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>>> 
>>>>>>>>>> I have some concerns/questions:
>>>>>>>>>> 
>>>>>>>>>>  1. Producer per task: I'm worried about the overhead this is going
>>>>>> to
>>>>>>>>>>  put on both the streams app and the Kafka Brokers. You can easily
>>>>>>>>>> imagine
>>>>>>>>>>  an app consuming thousands of partitions. What load will this put
>>>>>> on
>>>>>>> the
>>>>>>>>>>  brokers? Am i correct in assuming that there will be metadata
>>>>>>> requests
>>>>>>>>>> per
>>>>>>>>>>  Producer? The memory overhead in the streams app will also
>>>> increase
>>>>>>>>>> fairly
>>>>>>>>>>  significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>>>>>> CONFIG?
>>>>>>>>>>  2. State Store recovery: As we already know, restoring the entire
>>>>>>>>>>  changelog can take an extremely long time. Even with a fairly
>>>> small
>>>>>>>>>> dataset
>>>>>>>>>>  and an inappropriately tuned segment size, this can take way too
>>>>>>> long.
>>>>>>>>>> My
>>>>>>>>>>  concern is that failures happen and then recovery takes "forever"
>>>>>>> and we
>>>>>>>>>>  end up in a situation where we need to change the
>>>> max.poll.interval
>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>  some very large number or else we end up in "rebalance hell". I
>>>>>> don't
>>>>>>>>>> think
>>>>>>>>>>  this provides a very good user experience. You mention RocksDB
>>>>>>>>>>  checkpointing in the doc - should we explore this idea some more?
>>>>>>> i.e.,
>>>>>>>>>>  understand the penalty for checkpointing. Maybe checkpoint every
>>>>>> *n*
>>>>>>>>>>   commits?
>>>>>>>>>>  3. What does EoS mean for Caching? If we set the commit interval
>>>> to
>>>>>>>>>>  100ms then the cache is not going to be very effective. Should it
>>>>>>> just
>>>>>>>>>> be
>>>>>>>>>>  disabled?
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Damian
>>>>>>>>>> 
>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>>>>>>>>> provide
>>>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>>> 
>>>>>>>>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>>>>>>>>> processing semantics without changing their app code at all by
>>>>>>> leveraging
>>>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>>> 
>>>>>>>>>>> The above wiki page provides a high-level view of the proposed
>>>>>>> changes,
>>>>>>>>>>> while detailed implementation design can be found in this Google
>>>>>> doc:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>>> 
>>>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>> 
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Just a quick follow up:

Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
1.0” version. The raised concerns are all valid, but hard to quantify at
the moment. Implementing KIP-129, that provides a clean design, allows
us to gain more insight in the performance implications. This enables
us, to make an educated decision, if the “producer per task” model
perform wells or not, and if a switch to a “producer per thread” model
is mandatory.

We also want to point out, that we can move incrementally from "producer
per task" to "producer per thread" design or apply some incremental
improvements to "producer per task" (as discussed in the doc). Thus,
there is not issue with regard to upgrading.


-Matthias



On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> Hi,
> 
> I want to pick up this thread again. As there are some concerns about
> the "producer per task" design, we did write up an alternative "producer
> per thread" design and discuss pros/cons of both approaches:
> 
> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE
> 
> 
> Looking forward to your feedback.
> 
> 
> -Matthias
> 
> 
> On 3/10/17 3:24 AM, Damian Guy wrote:
>> Hi Matthias,
>>
>> Thanks for the response. I agree with you regarding the use of
>> PartitionGrouper to reduce the number of tasks. It would be good to have an
>> idea of any additional load on the brokers as we increase the number of
>> tasks and therefore producers.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io> wrote:
>>
>>> Damian, Jun,
>>>
>>> Thanks for your input.
>>>
>>>
>>> About Performance test:
>>>
>>> I can follow up with more performance tests using more partitions and
>>> also collecting broker metrics.
>>>
>>> However, I want to highlight again, that even if 1000+ partitions would
>>> be problematic, one can simply implement PartitionGrouper interface and
>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>>> should block this KIP, even if there might be some performance penalty
>>> for currently single partitioned tasks.
>>>
>>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>>> an issue with regard to memory consumption. I did not track "current
>>> head/off-heap" memory as this would require a more advance test setup to
>>> monitor it over time. If you think this would be required, we can do
>>> some tests though.
>>>
>>> However, as 256 MB was enough memory, and there are other components
>>> next to the producers using memory, I don't expect a severely increased
>>> memory usage. Producer allocate memory on-demand, and if load is shared
>>> over multiple producers, overall memory usage should stay the same as a
>>> single producer should allocate less memory.
>>>
>>>
>>> About Batching:
>>>
>>> As you can see from the benchmarks (in the detailed view -- I also added
>>> some graphs to the summary now) the average batch size gets slightly
>>> decrease with an increased number of partitions. However, there is no
>>> big difference between "producer per thread" and "producer per task"
>>> scenario.
>>>
>>>
>>> About acks:
>>>
>>> This is covered by KIP-98 already. If idempotent producer is use, it's
>>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>>> -- otherwise a config exception will be thrown. For transactions, it's
>>> further required that acks=-1 to avoid a config exception.
>>>
>>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>>> configs) are out of scope, and it's user responsibility to set those
>>> values correctly to ensure transactionality and idempotency.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>>> Hi, Guozhang,
>>>>
>>>> Thanks for the KIP. A couple of comments.
>>>>
>>>> 1. About the impact on producer batching. My understanding is that
>>>> typically different sub-topologies in the same task are publishing to
>>>> different topics. Since the producer batching happens at the
>>>> topic/partition level, using a producer per task may not impact batching
>>>> much.
>>>>
>>>> 2. When processing.guarantee is set to exactly_once, do we want to
>>> enforce
>>>> acks to all in the producer? The default acks is 1 and may cause acked
>>> data
>>>> to be lost later when the leader changes.
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Thanks. The perf test is a good start but I don't think it goes far
>>> enough.
>>>>> 100 partitions is not a lot. What happens when there are thousands of
>>>>> partitions? What is the load on the brokers? How much more memory is
>>> used
>>>>> by the Streams App etc?
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to give a first respond:
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1. Producer per task:
>>>>>>
>>>>>> First, we did some performance tests, indicating that the performance
>>>>>> penalty is small. Please have a look here:
>>>>>>
>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>>
>>>>>> For the test, we ran with a trunk version and a modified version that
>>>>>> uses a producer per task (of course, no transactions, but at-least-once
>>>>>> semantics). The scaling factor indicates the number of brokers and
>>>>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>>>>> part of AK code base.
>>>>>>
>>>>>>
>>>>>> Second, as the design is "producer per task" (and not "producer per
>>>>>> partition") it is possible to specify a custom PartitionGrouper that
>>>>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>>>>> the number of tasks for scenarios with many partitions. Right now, this
>>>>>> interface must be implemented solely by the user, but we could also add
>>>>>> a new config parameter that specifies the max.number.of.tasks or
>>>>>> partitions.per.task so that the user can configure this instead of
>>>>>> implementing the interface.
>>>>>>
>>>>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>>>>> resources (network connections, memory, etc) over multiple producers.
>>>>>> This would allow to separate multiple transaction on the producer
>>> level,
>>>>>> while resources are shared. There is no detailed design document yet
>>> and
>>>>>> there would be a KIP for this feature.
>>>>>>
>>>>>> Thus, if there should be any performance problems for high scale
>>>>>> scenarios, there are multiple ways to tackle them while keeping the
>>>>>> "producer per task" design.
>>>>>>
>>>>>> Additionally, a "producer per thread" design would be way more complex
>>>>>> and I summarized the issues in a separate document. I will share a link
>>>>>> to the document soon.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2. StateStore recovery:
>>>>>>
>>>>>> Streams EoS will in the first design not allow to exploit the
>>>>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>>>>> faces the same issues of potentially long recovery, there is no
>>>>>> regression with this regard. Thus, I see those improvements as
>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>>>>> options and if possible get them into 0.11 such that Streams with EoS
>>>>>> gets the same improvements as at-least-once scenario.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 3. Caching:
>>>>>>
>>>>>> We might need to do some experiments to quantify the impact on caching.
>>>>>> If it's severe, the suggested default commit interval of 100ms could
>>>>>> also be increased. Also, EoS will not enforce any commit interval, but
>>>>>> only change the default value. Thus, a user can freely trade-off
>>> latency
>>>>>> vs. caching-effect.
>>>>>>
>>>>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>>>>> intermediate topic. This would be an advance design for Streams EoS
>>> that
>>>>>> allows downstream sub-topologies to read uncommitted data
>>>>>> optimistically. In case of failure, a cascading abort of transactions
>>>>>> would be required. This change will need another KIP.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 4. Idempotent Producer:
>>>>>>
>>>>>> The transactional part automatically leverages the idempotent
>>> properties
>>>>>> of the producer. Idempotency is a requirement:
>>>>>>
>>>>>>> Note that enable.idempotence must be enabled if a TransactionalId is
>>>>>> configured.
>>>>>>
>>>>>> See
>>>>>>
>>>>>>
>>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>>
>>>>>> All idempotent retries, are handled by the producer internally (with or
>>>>>> without transaction) if enable.idempotence is set to true.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>>> Another question:
>>>>>>>
>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>>> guarantee
>>>>>> from KIP-98. It seems that only the transactional part is needed. Or is
>>>>> the
>>>>>> idempotence guarantee working behind the scenes and helping for some
>>>>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>>
>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>>>>>>>
>>>>>>>> I second the concern on with the one producer per task approach. At a
>>>>>>>> high-level it seems to make sense but I think Damian is exactly right
>>>>>> that
>>>>>>>> that cuts against the general design of the producer. Many people
>>> have
>>>>>> high
>>>>>>>> input partition counts and will have high task counts as a result. I
>>>>>> think
>>>>>>>> processing 1000 partitions should not be an unreasonable thing to
>>> want
>>>>>> to
>>>>>>>> do.
>>>>>>>>
>>>>>>>> The tricky bits will be:
>>>>>>>>
>>>>>>>>   - Reduced effectiveness of batching (or more latency and memory to
>>>>> get
>>>>>>>>   equivalent batching). This doesn't show up in simple benchmarks
>>>>>> because
>>>>>>>>   much of the penalty is I/O and CPU on the broker and the additional
>>>>>> threads
>>>>>>>>   from all the producers can make a single-threaded benchmark seem
>>>>>> faster.
>>>>>>>>   - TCP connection explosion. We maintain one connection per broker.
>>>>>> This
>>>>>>>>   is already high since each app instance does this. This design
>>>>> though
>>>>>> will
>>>>>>>>   add an additional multiplicative factor based on the partition
>>> count
>>>>>> of the
>>>>>>>>   input.
>>>>>>>>   - Connection and metadata request storms. When an instance with
>>> 1000
>>>>>>>>   tasks starts up it is going to try to create many thousands of
>>>>>> connections
>>>>>>>>   and issue a thousand metadata requests all at once.
>>>>>>>>   - Memory usage. We currently default to 64MB per producer. This can
>>>>> be
>>>>>>>>   tuned down, but the fact that we are spreading the batching over
>>>>> more
>>>>>>>>   producers will fundamentally mean we need a lot more memory to get
>>>>>> good
>>>>>>>>   perf and the memory usage will change as your task assignment
>>>>> changes
>>>>>> so it
>>>>>>>>   will be hard to set correctly unless it is done automatically.
>>>>>>>>   - Metrics explosion (1000 producer instances, each with their own
>>>>>>>>   metrics to monitor).
>>>>>>>>   - Thread explosion, 1000 background threads, one per producer, each
>>>>>>>>   sending data.
>>>>>>>>
>>>>>>>> -Jay
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Guozhang,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
>>>>> and
>>>>>> will
>>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>>
>>>>>>>>> I have some concerns/questions:
>>>>>>>>>
>>>>>>>>>   1. Producer per task: I'm worried about the overhead this is going
>>>>> to
>>>>>>>>>   put on both the streams app and the Kafka Brokers. You can easily
>>>>>>>>> imagine
>>>>>>>>>   an app consuming thousands of partitions. What load will this put
>>>>> on
>>>>>> the
>>>>>>>>>   brokers? Am i correct in assuming that there will be metadata
>>>>>> requests
>>>>>>>>> per
>>>>>>>>>   Producer? The memory overhead in the streams app will also
>>> increase
>>>>>>>>> fairly
>>>>>>>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>>>>> CONFIG?
>>>>>>>>>   2. State Store recovery: As we already know, restoring the entire
>>>>>>>>>   changelog can take an extremely long time. Even with a fairly
>>> small
>>>>>>>>> dataset
>>>>>>>>>   and an inappropriately tuned segment size, this can take way too
>>>>>> long.
>>>>>>>>> My
>>>>>>>>>   concern is that failures happen and then recovery takes "forever"
>>>>>> and we
>>>>>>>>>   end up in a situation where we need to change the
>>> max.poll.interval
>>>>>> to
>>>>>>>>> be
>>>>>>>>>   some very large number or else we end up in "rebalance hell". I
>>>>> don't
>>>>>>>>> think
>>>>>>>>>   this provides a very good user experience. You mention RocksDB
>>>>>>>>>   checkpointing in the doc - should we explore this idea some more?
>>>>>> i.e.,
>>>>>>>>>   understand the penalty for checkpointing. Maybe checkpoint every
>>>>> *n*
>>>>>>>>>    commits?
>>>>>>>>>   3. What does EoS mean for Caching? If we set the commit interval
>>> to
>>>>>>>>>   100ms then the cache is not going to be very effective. Should it
>>>>>> just
>>>>>>>>> be
>>>>>>>>>   disabled?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>>>>>>>> provide
>>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>>
>>>>>>>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>>>>>>>> processing semantics without changing their app code at all by
>>>>>> leveraging
>>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>>
>>>>>>>>>> The above wiki page provides a high-level view of the proposed
>>>>>> changes,
>>>>>>>>>> while detailed implementation design can be found in this Google
>>>>> doc:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>>
>>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I want to pick up this thread again. As there are some concerns about
the "producer per task" design, we did write up an alternative "producer
per thread" design and discuss pros/cons of both approaches:

https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE


Looking forward to your feedback.


-Matthias


On 3/10/17 3:24 AM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for the response. I agree with you regarding the use of
> PartitionGrouper to reduce the number of tasks. It would be good to have an
> idea of any additional load on the brokers as we increase the number of
> tasks and therefore producers.
> 
> Thanks,
> Damian
> 
> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> Damian, Jun,
>>
>> Thanks for your input.
>>
>>
>> About Performance test:
>>
>> I can follow up with more performance tests using more partitions and
>> also collecting broker metrics.
>>
>> However, I want to highlight again, that even if 1000+ partitions would
>> be problematic, one can simply implement PartitionGrouper interface and
>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>> should block this KIP, even if there might be some performance penalty
>> for currently single partitioned tasks.
>>
>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>> an issue with regard to memory consumption. I did not track "current
>> head/off-heap" memory as this would require a more advance test setup to
>> monitor it over time. If you think this would be required, we can do
>> some tests though.
>>
>> However, as 256 MB was enough memory, and there are other components
>> next to the producers using memory, I don't expect a severely increased
>> memory usage. Producer allocate memory on-demand, and if load is shared
>> over multiple producers, overall memory usage should stay the same as a
>> single producer should allocate less memory.
>>
>>
>> About Batching:
>>
>> As you can see from the benchmarks (in the detailed view -- I also added
>> some graphs to the summary now) the average batch size gets slightly
>> decrease with an increased number of partitions. However, there is no
>> big difference between "producer per thread" and "producer per task"
>> scenario.
>>
>>
>> About acks:
>>
>> This is covered by KIP-98 already. If idempotent producer is use, it's
>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>> -- otherwise a config exception will be thrown. For transactions, it's
>> further required that acks=-1 to avoid a config exception.
>>
>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>> configs) are out of scope, and it's user responsibility to set those
>> values correctly to ensure transactionality and idempotency.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>> Hi, Guozhang,
>>>
>>> Thanks for the KIP. A couple of comments.
>>>
>>> 1. About the impact on producer batching. My understanding is that
>>> typically different sub-topologies in the same task are publishing to
>>> different topics. Since the producer batching happens at the
>>> topic/partition level, using a producer per task may not impact batching
>>> much.
>>>
>>> 2. When processing.guarantee is set to exactly_once, do we want to
>> enforce
>>> acks to all in the producer? The default acks is 1 and may cause acked
>> data
>>> to be lost later when the leader changes.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Thanks. The perf test is a good start but I don't think it goes far
>> enough.
>>>> 100 partitions is not a lot. What happens when there are thousands of
>>>> partitions? What is the load on the brokers? How much more memory is
>> used
>>>> by the Streams App etc?
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to give a first respond:
>>>>>
>>>>>
>>>>>
>>>>> 1. Producer per task:
>>>>>
>>>>> First, we did some performance tests, indicating that the performance
>>>>> penalty is small. Please have a look here:
>>>>>
>>>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>>>
>>>>> For the test, we ran with a trunk version and a modified version that
>>>>> uses a producer per task (of course, no transactions, but at-least-once
>>>>> semantics). The scaling factor indicates the number of brokers and
>>>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>>>> part of AK code base.
>>>>>
>>>>>
>>>>> Second, as the design is "producer per task" (and not "producer per
>>>>> partition") it is possible to specify a custom PartitionGrouper that
>>>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>>>> the number of tasks for scenarios with many partitions. Right now, this
>>>>> interface must be implemented solely by the user, but we could also add
>>>>> a new config parameter that specifies the max.number.of.tasks or
>>>>> partitions.per.task so that the user can configure this instead of
>>>>> implementing the interface.
>>>>>
>>>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>>>> resources (network connections, memory, etc) over multiple producers.
>>>>> This would allow to separate multiple transaction on the producer
>> level,
>>>>> while resources are shared. There is no detailed design document yet
>> and
>>>>> there would be a KIP for this feature.
>>>>>
>>>>> Thus, if there should be any performance problems for high scale
>>>>> scenarios, there are multiple ways to tackle them while keeping the
>>>>> "producer per task" design.
>>>>>
>>>>> Additionally, a "producer per thread" design would be way more complex
>>>>> and I summarized the issues in a separate document. I will share a link
>>>>> to the document soon.
>>>>>
>>>>>
>>>>>
>>>>> 2. StateStore recovery:
>>>>>
>>>>> Streams EoS will in the first design not allow to exploit the
>>>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>>>> faces the same issues of potentially long recovery, there is no
>>>>> regression with this regard. Thus, I see those improvements as
>>>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>>>> options and if possible get them into 0.11 such that Streams with EoS
>>>>> gets the same improvements as at-least-once scenario.
>>>>>
>>>>>
>>>>>
>>>>> 3. Caching:
>>>>>
>>>>> We might need to do some experiments to quantify the impact on caching.
>>>>> If it's severe, the suggested default commit interval of 100ms could
>>>>> also be increased. Also, EoS will not enforce any commit interval, but
>>>>> only change the default value. Thus, a user can freely trade-off
>> latency
>>>>> vs. caching-effect.
>>>>>
>>>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>>>> intermediate topic. This would be an advance design for Streams EoS
>> that
>>>>> allows downstream sub-topologies to read uncommitted data
>>>>> optimistically. In case of failure, a cascading abort of transactions
>>>>> would be required. This change will need another KIP.
>>>>>
>>>>>
>>>>>
>>>>> 4. Idempotent Producer:
>>>>>
>>>>> The transactional part automatically leverages the idempotent
>> properties
>>>>> of the producer. Idempotency is a requirement:
>>>>>
>>>>>> Note that enable.idempotence must be enabled if a TransactionalId is
>>>>> configured.
>>>>>
>>>>> See
>>>>>
>>>>>
>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>>>
>>>>> All idempotent retries, are handled by the producer internally (with or
>>>>> without transaction) if enable.idempotence is set to true.
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>>>> Another question:
>>>>>>
>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence
>> guarantee
>>>>> from KIP-98. It seems that only the transactional part is needed. Or is
>>>> the
>>>>> idempotence guarantee working behind the scenes and helping for some
>>>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>>>> retransmitting a record after a temporary network glitch)?
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>>>>>>
>>>>>>> I second the concern on with the one producer per task approach. At a
>>>>>>> high-level it seems to make sense but I think Damian is exactly right
>>>>> that
>>>>>>> that cuts against the general design of the producer. Many people
>> have
>>>>> high
>>>>>>> input partition counts and will have high task counts as a result. I
>>>>> think
>>>>>>> processing 1000 partitions should not be an unreasonable thing to
>> want
>>>>> to
>>>>>>> do.
>>>>>>>
>>>>>>> The tricky bits will be:
>>>>>>>
>>>>>>>   - Reduced effectiveness of batching (or more latency and memory to
>>>> get
>>>>>>>   equivalent batching). This doesn't show up in simple benchmarks
>>>>> because
>>>>>>>   much of the penalty is I/O and CPU on the broker and the additional
>>>>> threads
>>>>>>>   from all the producers can make a single-threaded benchmark seem
>>>>> faster.
>>>>>>>   - TCP connection explosion. We maintain one connection per broker.
>>>>> This
>>>>>>>   is already high since each app instance does this. This design
>>>> though
>>>>> will
>>>>>>>   add an additional multiplicative factor based on the partition
>> count
>>>>> of the
>>>>>>>   input.
>>>>>>>   - Connection and metadata request storms. When an instance with
>> 1000
>>>>>>>   tasks starts up it is going to try to create many thousands of
>>>>> connections
>>>>>>>   and issue a thousand metadata requests all at once.
>>>>>>>   - Memory usage. We currently default to 64MB per producer. This can
>>>> be
>>>>>>>   tuned down, but the fact that we are spreading the batching over
>>>> more
>>>>>>>   producers will fundamentally mean we need a lot more memory to get
>>>>> good
>>>>>>>   perf and the memory usage will change as your task assignment
>>>> changes
>>>>> so it
>>>>>>>   will be hard to set correctly unless it is done automatically.
>>>>>>>   - Metrics explosion (1000 producer instances, each with their own
>>>>>>>   metrics to monitor).
>>>>>>>   - Thread explosion, 1000 background threads, one per producer, each
>>>>>>>   sending data.
>>>>>>>
>>>>>>> -Jay
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
>>>> and
>>>>> will
>>>>>>>> help to unlock a bunch of use cases.
>>>>>>>>
>>>>>>>> I have some concerns/questions:
>>>>>>>>
>>>>>>>>   1. Producer per task: I'm worried about the overhead this is going
>>>> to
>>>>>>>>   put on both the streams app and the Kafka Brokers. You can easily
>>>>>>>> imagine
>>>>>>>>   an app consuming thousands of partitions. What load will this put
>>>> on
>>>>> the
>>>>>>>>   brokers? Am i correct in assuming that there will be metadata
>>>>> requests
>>>>>>>> per
>>>>>>>>   Producer? The memory overhead in the streams app will also
>> increase
>>>>>>>> fairly
>>>>>>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>>>> CONFIG?
>>>>>>>>   2. State Store recovery: As we already know, restoring the entire
>>>>>>>>   changelog can take an extremely long time. Even with a fairly
>> small
>>>>>>>> dataset
>>>>>>>>   and an inappropriately tuned segment size, this can take way too
>>>>> long.
>>>>>>>> My
>>>>>>>>   concern is that failures happen and then recovery takes "forever"
>>>>> and we
>>>>>>>>   end up in a situation where we need to change the
>> max.poll.interval
>>>>> to
>>>>>>>> be
>>>>>>>>   some very large number or else we end up in "rebalance hell". I
>>>> don't
>>>>>>>> think
>>>>>>>>   this provides a very good user experience. You mention RocksDB
>>>>>>>>   checkpointing in the doc - should we explore this idea some more?
>>>>> i.e.,
>>>>>>>>   understand the penalty for checkpointing. Maybe checkpoint every
>>>> *n*
>>>>>>>>    commits?
>>>>>>>>   3. What does EoS mean for Caching? If we set the commit interval
>> to
>>>>>>>>   100ms then the cache is not going to be very effective. Should it
>>>>> just
>>>>>>>> be
>>>>>>>>   disabled?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Damian
>>>>>>>>
>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>>>>>>> provide
>>>>>>>>> exactly-once processing semantics:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>>>
>>>>>>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>>>>>>> processing semantics without changing their app code at all by
>>>>> leveraging
>>>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>>>
>>>>>>>>> The above wiki page provides a high-level view of the proposed
>>>>> changes,
>>>>>>>>> while detailed implementation design can be found in this Google
>>>> doc:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>>>> FK1DAB8_gBYA2c
>>>>>>>>>
>>>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Damian Guy <da...@gmail.com>.
Hi Matthias,

Thanks for the response. I agree with you regarding the use of
PartitionGrouper to reduce the number of tasks. It would be good to have an
idea of any additional load on the brokers as we increase the number of
tasks and therefore producers.

Thanks,
Damian

On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <ma...@confluent.io> wrote:

> Damian, Jun,
>
> Thanks for your input.
>
>
> About Performance test:
>
> I can follow up with more performance tests using more partitions and
> also collecting broker metrics.
>
> However, I want to highlight again, that even if 1000+ partitions would
> be problematic, one can simply implement PartitionGrouper interface and
> reduce the number of tasks to 250 or 100... So I am not sure, if we
> should block this KIP, even if there might be some performance penalty
> for currently single partitioned tasks.
>
> About memory usage. JXM max-heap and max-off-heap did report 256MB and
> 133MB for all experiments (thus I did not put it in the spreadsheet).
> Thus, using 100 producers (each using a max of 32MB of memory) was not
> an issue with regard to memory consumption. I did not track "current
> head/off-heap" memory as this would require a more advance test setup to
> monitor it over time. If you think this would be required, we can do
> some tests though.
>
> However, as 256 MB was enough memory, and there are other components
> next to the producers using memory, I don't expect a severely increased
> memory usage. Producer allocate memory on-demand, and if load is shared
> over multiple producers, overall memory usage should stay the same as a
> single producer should allocate less memory.
>
>
> About Batching:
>
> As you can see from the benchmarks (in the detailed view -- I also added
> some graphs to the summary now) the average batch size gets slightly
> decrease with an increased number of partitions. However, there is no
> big difference between "producer per thread" and "producer per task"
> scenario.
>
>
> About acks:
>
> This is covered by KIP-98 already. If idempotent producer is use, it's
> required to set max.in.flight.requests.per.connection=1 and retries > 0
> -- otherwise a config exception will be thrown. For transactions, it's
> further required that acks=-1 to avoid a config exception.
>
> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
> configs) are out of scope, and it's user responsibility to set those
> values correctly to ensure transactionality and idempotency.
>
>
>
> -Matthias
>
>
> On 3/7/17 9:32 AM, Jun Rao wrote:
> > Hi, Guozhang,
> >
> > Thanks for the KIP. A couple of comments.
> >
> > 1. About the impact on producer batching. My understanding is that
> > typically different sub-topologies in the same task are publishing to
> > different topics. Since the producer batching happens at the
> > topic/partition level, using a producer per task may not impact batching
> > much.
> >
> > 2. When processing.guarantee is set to exactly_once, do we want to
> enforce
> > acks to all in the producer? The default acks is 1 and may cause acked
> data
> > to be lost later when the leader changes.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
> >
> >> Hi Matthias,
> >>
> >> Thanks. The perf test is a good start but I don't think it goes far
> enough.
> >> 100 partitions is not a lot. What happens when there are thousands of
> >> partitions? What is the load on the brokers? How much more memory is
> used
> >> by the Streams App etc?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I want to give a first respond:
> >>>
> >>>
> >>>
> >>> 1. Producer per task:
> >>>
> >>> First, we did some performance tests, indicating that the performance
> >>> penalty is small. Please have a look here:
> >>>
> >>> https://docs.google.com/spreadsheets/d/18aGOB13-
> >> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> >>>
> >>> For the test, we ran with a trunk version and a modified version that
> >>> uses a producer per task (of course, no transactions, but at-least-once
> >>> semantics). The scaling factor indicates the number of brokers and
> >>> (single threaded) Streams instances. We used SimpleBenchmark that is
> >>> part of AK code base.
> >>>
> >>>
> >>> Second, as the design is "producer per task" (and not "producer per
> >>> partition") it is possible to specify a custom PartitionGrouper that
> >>> assigns multiple partitions to a single task. Thus, it allows to reduce
> >>> the number of tasks for scenarios with many partitions. Right now, this
> >>> interface must be implemented solely by the user, but we could also add
> >>> a new config parameter that specifies the max.number.of.tasks or
> >>> partitions.per.task so that the user can configure this instead of
> >>> implementing the interface.
> >>>
> >>> Third, there is the idea of a "Producer Pool" that would allow to share
> >>> resources (network connections, memory, etc) over multiple producers.
> >>> This would allow to separate multiple transaction on the producer
> level,
> >>> while resources are shared. There is no detailed design document yet
> and
> >>> there would be a KIP for this feature.
> >>>
> >>> Thus, if there should be any performance problems for high scale
> >>> scenarios, there are multiple ways to tackle them while keeping the
> >>> "producer per task" design.
> >>>
> >>> Additionally, a "producer per thread" design would be way more complex
> >>> and I summarized the issues in a separate document. I will share a link
> >>> to the document soon.
> >>>
> >>>
> >>>
> >>> 2. StateStore recovery:
> >>>
> >>> Streams EoS will in the first design not allow to exploit the
> >>> improvements that are added for 0.11 at the moment. However, as 0.10.2
> >>> faces the same issues of potentially long recovery, there is no
> >>> regression with this regard. Thus, I see those improvements as
> >>> orthogonal or add-ons. Nevertheless, we should try to explore those
> >>> options and if possible get them into 0.11 such that Streams with EoS
> >>> gets the same improvements as at-least-once scenario.
> >>>
> >>>
> >>>
> >>> 3. Caching:
> >>>
> >>> We might need to do some experiments to quantify the impact on caching.
> >>> If it's severe, the suggested default commit interval of 100ms could
> >>> also be increased. Also, EoS will not enforce any commit interval, but
> >>> only change the default value. Thus, a user can freely trade-off
> latency
> >>> vs. caching-effect.
> >>>
> >>> Last but not least, there is the idea to allow "read_uncommitted" for
> >>> intermediate topic. This would be an advance design for Streams EoS
> that
> >>> allows downstream sub-topologies to read uncommitted data
> >>> optimistically. In case of failure, a cascading abort of transactions
> >>> would be required. This change will need another KIP.
> >>>
> >>>
> >>>
> >>> 4. Idempotent Producer:
> >>>
> >>> The transactional part automatically leverages the idempotent
> properties
> >>> of the producer. Idempotency is a requirement:
> >>>
> >>>> Note that enable.idempotence must be enabled if a TransactionalId is
> >>> configured.
> >>>
> >>> See
> >>>
> >>>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> >> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> >>>
> >>> All idempotent retries, are handled by the producer internally (with or
> >>> without transaction) if enable.idempotence is set to true.
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 3/3/17 3:34 AM, Eno Thereska wrote:
> >>>> Another question:
> >>>>
> >>>> The KIP doesn’t exactly spell out how it uses the idempotence
> guarantee
> >>> from KIP-98. It seems that only the transactional part is needed. Or is
> >> the
> >>> idempotence guarantee working behind the scenes and helping for some
> >>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
> >>> retransmitting a record after a temporary network glitch)?
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
> >>>>>
> >>>>> I second the concern on with the one producer per task approach. At a
> >>>>> high-level it seems to make sense but I think Damian is exactly right
> >>> that
> >>>>> that cuts against the general design of the producer. Many people
> have
> >>> high
> >>>>> input partition counts and will have high task counts as a result. I
> >>> think
> >>>>> processing 1000 partitions should not be an unreasonable thing to
> want
> >>> to
> >>>>> do.
> >>>>>
> >>>>> The tricky bits will be:
> >>>>>
> >>>>>   - Reduced effectiveness of batching (or more latency and memory to
> >> get
> >>>>>   equivalent batching). This doesn't show up in simple benchmarks
> >>> because
> >>>>>   much of the penalty is I/O and CPU on the broker and the additional
> >>> threads
> >>>>>   from all the producers can make a single-threaded benchmark seem
> >>> faster.
> >>>>>   - TCP connection explosion. We maintain one connection per broker.
> >>> This
> >>>>>   is already high since each app instance does this. This design
> >> though
> >>> will
> >>>>>   add an additional multiplicative factor based on the partition
> count
> >>> of the
> >>>>>   input.
> >>>>>   - Connection and metadata request storms. When an instance with
> 1000
> >>>>>   tasks starts up it is going to try to create many thousands of
> >>> connections
> >>>>>   and issue a thousand metadata requests all at once.
> >>>>>   - Memory usage. We currently default to 64MB per producer. This can
> >> be
> >>>>>   tuned down, but the fact that we are spreading the batching over
> >> more
> >>>>>   producers will fundamentally mean we need a lot more memory to get
> >>> good
> >>>>>   perf and the memory usage will change as your task assignment
> >> changes
> >>> so it
> >>>>>   will be hard to set correctly unless it is done automatically.
> >>>>>   - Metrics explosion (1000 producer instances, each with their own
> >>>>>   metrics to monitor).
> >>>>>   - Thread explosion, 1000 background threads, one per producer, each
> >>>>>   sending data.
> >>>>>
> >>>>> -Jay
> >>>>>
> >>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Hi Guozhang,
> >>>>>>
> >>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
> >> and
> >>> will
> >>>>>> help to unlock a bunch of use cases.
> >>>>>>
> >>>>>> I have some concerns/questions:
> >>>>>>
> >>>>>>   1. Producer per task: I'm worried about the overhead this is going
> >> to
> >>>>>>   put on both the streams app and the Kafka Brokers. You can easily
> >>>>>> imagine
> >>>>>>   an app consuming thousands of partitions. What load will this put
> >> on
> >>> the
> >>>>>>   brokers? Am i correct in assuming that there will be metadata
> >>> requests
> >>>>>> per
> >>>>>>   Producer? The memory overhead in the streams app will also
> increase
> >>>>>> fairly
> >>>>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
> >> CONFIG?
> >>>>>>   2. State Store recovery: As we already know, restoring the entire
> >>>>>>   changelog can take an extremely long time. Even with a fairly
> small
> >>>>>> dataset
> >>>>>>   and an inappropriately tuned segment size, this can take way too
> >>> long.
> >>>>>> My
> >>>>>>   concern is that failures happen and then recovery takes "forever"
> >>> and we
> >>>>>>   end up in a situation where we need to change the
> max.poll.interval
> >>> to
> >>>>>> be
> >>>>>>   some very large number or else we end up in "rebalance hell". I
> >> don't
> >>>>>> think
> >>>>>>   this provides a very good user experience. You mention RocksDB
> >>>>>>   checkpointing in the doc - should we explore this idea some more?
> >>> i.e.,
> >>>>>>   understand the penalty for checkpointing. Maybe checkpoint every
> >> *n*
> >>>>>>    commits?
> >>>>>>   3. What does EoS mean for Caching? If we set the commit interval
> to
> >>>>>>   100ms then the cache is not going to be very effective. Should it
> >>> just
> >>>>>> be
> >>>>>>   disabled?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> >>>>>> provide
> >>>>>>> exactly-once processing semantics:
> >>>>>>>
> >>>>>>>
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 129%3A+Streams+Exactly-Once+Semantics
> >>>>>>>
> >>>>>>> This KIP enables Streams users to optionally turn on exactly-once
> >>>>>>> processing semantics without changing their app code at all by
> >>> leveraging
> >>>>>>> the transactional messaging features provided in KIP-98.
> >>>>>>>
> >>>>>>> The above wiki page provides a high-level view of the proposed
> >>> changes,
> >>>>>>> while detailed implementation design can be found in this Google
> >> doc:
> >>>>>>>
> >>>>>>>
> >>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> >>>>>> FK1DAB8_gBYA2c
> >>>>>>>
> >>>>>>> We would love to hear your comments and suggestions.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Damian, Jun,

Thanks for your input.


About Performance test:

I can follow up with more performance tests using more partitions and
also collecting broker metrics.

However, I want to highlight again, that even if 1000+ partitions would
be problematic, one can simply implement PartitionGrouper interface and
reduce the number of tasks to 250 or 100... So I am not sure, if we
should block this KIP, even if there might be some performance penalty
for currently single partitioned tasks.

About memory usage. JXM max-heap and max-off-heap did report 256MB and
133MB for all experiments (thus I did not put it in the spreadsheet).
Thus, using 100 producers (each using a max of 32MB of memory) was not
an issue with regard to memory consumption. I did not track "current
head/off-heap" memory as this would require a more advance test setup to
monitor it over time. If you think this would be required, we can do
some tests though.

However, as 256 MB was enough memory, and there are other components
next to the producers using memory, I don't expect a severely increased
memory usage. Producer allocate memory on-demand, and if load is shared
over multiple producers, overall memory usage should stay the same as a
single producer should allocate less memory.


About Batching:

As you can see from the benchmarks (in the detailed view -- I also added
some graphs to the summary now) the average batch size gets slightly
decrease with an increased number of partitions. However, there is no
big difference between "producer per thread" and "producer per task"
scenario.


About acks:

This is covered by KIP-98 already. If idempotent producer is use, it's
required to set max.in.flight.requests.per.connection=1 and retries > 0
-- otherwise a config exception will be thrown. For transactions, it's
further required that acks=-1 to avoid a config exception.

Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
configs) are out of scope, and it's user responsibility to set those
values correctly to ensure transactionality and idempotency.



-Matthias


On 3/7/17 9:32 AM, Jun Rao wrote:
> Hi, Guozhang,
> 
> Thanks for the KIP. A couple of comments.
> 
> 1. About the impact on producer batching. My understanding is that
> typically different sub-topologies in the same task are publishing to
> different topics. Since the producer batching happens at the
> topic/partition level, using a producer per task may not impact batching
> much.
> 
> 2. When processing.guarantee is set to exactly_once, do we want to enforce
> acks to all in the producer? The default acks is 1 and may cause acked data
> to be lost later when the leader changes.
> 
> Thanks,
> 
> Jun
> 
> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:
> 
>> Hi Matthias,
>>
>> Thanks. The perf test is a good start but I don't think it goes far enough.
>> 100 partitions is not a lot. What happens when there are thousands of
>> partitions? What is the load on the brokers? How much more memory is used
>> by the Streams App etc?
>>
>> Thanks,
>> Damian
>>
>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io> wrote:
>>
>>> Hi,
>>>
>>> I want to give a first respond:
>>>
>>>
>>>
>>> 1. Producer per task:
>>>
>>> First, we did some performance tests, indicating that the performance
>>> penalty is small. Please have a look here:
>>>
>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>
>>> For the test, we ran with a trunk version and a modified version that
>>> uses a producer per task (of course, no transactions, but at-least-once
>>> semantics). The scaling factor indicates the number of brokers and
>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>> part of AK code base.
>>>
>>>
>>> Second, as the design is "producer per task" (and not "producer per
>>> partition") it is possible to specify a custom PartitionGrouper that
>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>> the number of tasks for scenarios with many partitions. Right now, this
>>> interface must be implemented solely by the user, but we could also add
>>> a new config parameter that specifies the max.number.of.tasks or
>>> partitions.per.task so that the user can configure this instead of
>>> implementing the interface.
>>>
>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>> resources (network connections, memory, etc) over multiple producers.
>>> This would allow to separate multiple transaction on the producer level,
>>> while resources are shared. There is no detailed design document yet and
>>> there would be a KIP for this feature.
>>>
>>> Thus, if there should be any performance problems for high scale
>>> scenarios, there are multiple ways to tackle them while keeping the
>>> "producer per task" design.
>>>
>>> Additionally, a "producer per thread" design would be way more complex
>>> and I summarized the issues in a separate document. I will share a link
>>> to the document soon.
>>>
>>>
>>>
>>> 2. StateStore recovery:
>>>
>>> Streams EoS will in the first design not allow to exploit the
>>> improvements that are added for 0.11 at the moment. However, as 0.10.2
>>> faces the same issues of potentially long recovery, there is no
>>> regression with this regard. Thus, I see those improvements as
>>> orthogonal or add-ons. Nevertheless, we should try to explore those
>>> options and if possible get them into 0.11 such that Streams with EoS
>>> gets the same improvements as at-least-once scenario.
>>>
>>>
>>>
>>> 3. Caching:
>>>
>>> We might need to do some experiments to quantify the impact on caching.
>>> If it's severe, the suggested default commit interval of 100ms could
>>> also be increased. Also, EoS will not enforce any commit interval, but
>>> only change the default value. Thus, a user can freely trade-off latency
>>> vs. caching-effect.
>>>
>>> Last but not least, there is the idea to allow "read_uncommitted" for
>>> intermediate topic. This would be an advance design for Streams EoS that
>>> allows downstream sub-topologies to read uncommitted data
>>> optimistically. In case of failure, a cascading abort of transactions
>>> would be required. This change will need another KIP.
>>>
>>>
>>>
>>> 4. Idempotent Producer:
>>>
>>> The transactional part automatically leverages the idempotent properties
>>> of the producer. Idempotency is a requirement:
>>>
>>>> Note that enable.idempotence must be enabled if a TransactionalId is
>>> configured.
>>>
>>> See
>>>
>>> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>>>
>>> All idempotent retries, are handled by the producer internally (with or
>>> without transaction) if enable.idempotence is set to true.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 3/3/17 3:34 AM, Eno Thereska wrote:
>>>> Another question:
>>>>
>>>> The KIP doesn’t exactly spell out how it uses the idempotence guarantee
>>> from KIP-98. It seems that only the transactional part is needed. Or is
>> the
>>> idempotence guarantee working behind the scenes and helping for some
>>> scenarios for which it is not worthwhile aborting a transaction (e.g.,
>>> retransmitting a record after a temporary network glitch)?
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>>>>
>>>>> I second the concern on with the one producer per task approach. At a
>>>>> high-level it seems to make sense but I think Damian is exactly right
>>> that
>>>>> that cuts against the general design of the producer. Many people have
>>> high
>>>>> input partition counts and will have high task counts as a result. I
>>> think
>>>>> processing 1000 partitions should not be an unreasonable thing to want
>>> to
>>>>> do.
>>>>>
>>>>> The tricky bits will be:
>>>>>
>>>>>   - Reduced effectiveness of batching (or more latency and memory to
>> get
>>>>>   equivalent batching). This doesn't show up in simple benchmarks
>>> because
>>>>>   much of the penalty is I/O and CPU on the broker and the additional
>>> threads
>>>>>   from all the producers can make a single-threaded benchmark seem
>>> faster.
>>>>>   - TCP connection explosion. We maintain one connection per broker.
>>> This
>>>>>   is already high since each app instance does this. This design
>> though
>>> will
>>>>>   add an additional multiplicative factor based on the partition count
>>> of the
>>>>>   input.
>>>>>   - Connection and metadata request storms. When an instance with 1000
>>>>>   tasks starts up it is going to try to create many thousands of
>>> connections
>>>>>   and issue a thousand metadata requests all at once.
>>>>>   - Memory usage. We currently default to 64MB per producer. This can
>> be
>>>>>   tuned down, but the fact that we are spreading the batching over
>> more
>>>>>   producers will fundamentally mean we need a lot more memory to get
>>> good
>>>>>   perf and the memory usage will change as your task assignment
>> changes
>>> so it
>>>>>   will be hard to set correctly unless it is done automatically.
>>>>>   - Metrics explosion (1000 producer instances, each with their own
>>>>>   metrics to monitor).
>>>>>   - Thread explosion, 1000 background threads, one per producer, each
>>>>>   sending data.
>>>>>
>>>>> -Jay
>>>>>
>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
>>> wrote:
>>>>>
>>>>>> Hi Guozhang,
>>>>>>
>>>>>> Thanks for the KIP! This is an important feature for Kafka Streams
>> and
>>> will
>>>>>> help to unlock a bunch of use cases.
>>>>>>
>>>>>> I have some concerns/questions:
>>>>>>
>>>>>>   1. Producer per task: I'm worried about the overhead this is going
>> to
>>>>>>   put on both the streams app and the Kafka Brokers. You can easily
>>>>>> imagine
>>>>>>   an app consuming thousands of partitions. What load will this put
>> on
>>> the
>>>>>>   brokers? Am i correct in assuming that there will be metadata
>>> requests
>>>>>> per
>>>>>>   Producer? The memory overhead in the streams app will also increase
>>>>>> fairly
>>>>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
>> CONFIG?
>>>>>>   2. State Store recovery: As we already know, restoring the entire
>>>>>>   changelog can take an extremely long time. Even with a fairly small
>>>>>> dataset
>>>>>>   and an inappropriately tuned segment size, this can take way too
>>> long.
>>>>>> My
>>>>>>   concern is that failures happen and then recovery takes "forever"
>>> and we
>>>>>>   end up in a situation where we need to change the max.poll.interval
>>> to
>>>>>> be
>>>>>>   some very large number or else we end up in "rebalance hell". I
>> don't
>>>>>> think
>>>>>>   this provides a very good user experience. You mention RocksDB
>>>>>>   checkpointing in the doc - should we explore this idea some more?
>>> i.e.,
>>>>>>   understand the penalty for checkpointing. Maybe checkpoint every
>> *n*
>>>>>>    commits?
>>>>>>   3. What does EoS mean for Caching? If we set the commit interval to
>>>>>>   100ms then the cache is not going to be very effective. Should it
>>> just
>>>>>> be
>>>>>>   disabled?
>>>>>>
>>>>>> Thanks,
>>>>>> Damian
>>>>>>
>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>>>>> provide
>>>>>>> exactly-once processing semantics:
>>>>>>>
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>>>>
>>>>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>>>>> processing semantics without changing their app code at all by
>>> leveraging
>>>>>>> the transactional messaging features provided in KIP-98.
>>>>>>>
>>>>>>> The above wiki page provides a high-level view of the proposed
>>> changes,
>>>>>>> while detailed implementation design can be found in this Google
>> doc:
>>>>>>>
>>>>>>>
>>>>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>>>>> FK1DAB8_gBYA2c
>>>>>>>
>>>>>>> We would love to hear your comments and suggestions.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Jun Rao <ju...@confluent.io>.
Hi, Guozhang,

Thanks for the KIP. A couple of comments.

1. About the impact on producer batching. My understanding is that
typically different sub-topologies in the same task are publishing to
different topics. Since the producer batching happens at the
topic/partition level, using a producer per task may not impact batching
much.

2. When processing.guarantee is set to exactly_once, do we want to enforce
acks to all in the producer? The default acks is 1 and may cause acked data
to be lost later when the leader changes.

Thanks,

Jun

On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Matthias,
>
> Thanks. The perf test is a good start but I don't think it goes far enough.
> 100 partitions is not a lot. What happens when there are thousands of
> partitions? What is the load on the brokers? How much more memory is used
> by the Streams App etc?
>
> Thanks,
> Damian
>
> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io> wrote:
>
> > Hi,
> >
> > I want to give a first respond:
> >
> >
> >
> > 1. Producer per task:
> >
> > First, we did some performance tests, indicating that the performance
> > penalty is small. Please have a look here:
> >
> > https://docs.google.com/spreadsheets/d/18aGOB13-
> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> >
> > For the test, we ran with a trunk version and a modified version that
> > uses a producer per task (of course, no transactions, but at-least-once
> > semantics). The scaling factor indicates the number of brokers and
> > (single threaded) Streams instances. We used SimpleBenchmark that is
> > part of AK code base.
> >
> >
> > Second, as the design is "producer per task" (and not "producer per
> > partition") it is possible to specify a custom PartitionGrouper that
> > assigns multiple partitions to a single task. Thus, it allows to reduce
> > the number of tasks for scenarios with many partitions. Right now, this
> > interface must be implemented solely by the user, but we could also add
> > a new config parameter that specifies the max.number.of.tasks or
> > partitions.per.task so that the user can configure this instead of
> > implementing the interface.
> >
> > Third, there is the idea of a "Producer Pool" that would allow to share
> > resources (network connections, memory, etc) over multiple producers.
> > This would allow to separate multiple transaction on the producer level,
> > while resources are shared. There is no detailed design document yet and
> > there would be a KIP for this feature.
> >
> > Thus, if there should be any performance problems for high scale
> > scenarios, there are multiple ways to tackle them while keeping the
> > "producer per task" design.
> >
> > Additionally, a "producer per thread" design would be way more complex
> > and I summarized the issues in a separate document. I will share a link
> > to the document soon.
> >
> >
> >
> > 2. StateStore recovery:
> >
> > Streams EoS will in the first design not allow to exploit the
> > improvements that are added for 0.11 at the moment. However, as 0.10.2
> > faces the same issues of potentially long recovery, there is no
> > regression with this regard. Thus, I see those improvements as
> > orthogonal or add-ons. Nevertheless, we should try to explore those
> > options and if possible get them into 0.11 such that Streams with EoS
> > gets the same improvements as at-least-once scenario.
> >
> >
> >
> > 3. Caching:
> >
> > We might need to do some experiments to quantify the impact on caching.
> > If it's severe, the suggested default commit interval of 100ms could
> > also be increased. Also, EoS will not enforce any commit interval, but
> > only change the default value. Thus, a user can freely trade-off latency
> > vs. caching-effect.
> >
> > Last but not least, there is the idea to allow "read_uncommitted" for
> > intermediate topic. This would be an advance design for Streams EoS that
> > allows downstream sub-topologies to read uncommitted data
> > optimistically. In case of failure, a cascading abort of transactions
> > would be required. This change will need another KIP.
> >
> >
> >
> > 4. Idempotent Producer:
> >
> > The transactional part automatically leverages the idempotent properties
> > of the producer. Idempotency is a requirement:
> >
> > > Note that enable.idempotence must be enabled if a TransactionalId is
> > configured.
> >
> > See
> >
> > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> >
> > All idempotent retries, are handled by the producer internally (with or
> > without transaction) if enable.idempotence is set to true.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 3/3/17 3:34 AM, Eno Thereska wrote:
> > > Another question:
> > >
> > > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> > from KIP-98. It seems that only the transactional part is needed. Or is
> the
> > idempotence guarantee working behind the scenes and helping for some
> > scenarios for which it is not worthwhile aborting a transaction (e.g.,
> > retransmitting a record after a temporary network glitch)?
> > >
> > > Thanks
> > > Eno
> > >
> > >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
> > >>
> > >> I second the concern on with the one producer per task approach. At a
> > >> high-level it seems to make sense but I think Damian is exactly right
> > that
> > >> that cuts against the general design of the producer. Many people have
> > high
> > >> input partition counts and will have high task counts as a result. I
> > think
> > >> processing 1000 partitions should not be an unreasonable thing to want
> > to
> > >> do.
> > >>
> > >> The tricky bits will be:
> > >>
> > >>   - Reduced effectiveness of batching (or more latency and memory to
> get
> > >>   equivalent batching). This doesn't show up in simple benchmarks
> > because
> > >>   much of the penalty is I/O and CPU on the broker and the additional
> > threads
> > >>   from all the producers can make a single-threaded benchmark seem
> > faster.
> > >>   - TCP connection explosion. We maintain one connection per broker.
> > This
> > >>   is already high since each app instance does this. This design
> though
> > will
> > >>   add an additional multiplicative factor based on the partition count
> > of the
> > >>   input.
> > >>   - Connection and metadata request storms. When an instance with 1000
> > >>   tasks starts up it is going to try to create many thousands of
> > connections
> > >>   and issue a thousand metadata requests all at once.
> > >>   - Memory usage. We currently default to 64MB per producer. This can
> be
> > >>   tuned down, but the fact that we are spreading the batching over
> more
> > >>   producers will fundamentally mean we need a lot more memory to get
> > good
> > >>   perf and the memory usage will change as your task assignment
> changes
> > so it
> > >>   will be hard to set correctly unless it is done automatically.
> > >>   - Metrics explosion (1000 producer instances, each with their own
> > >>   metrics to monitor).
> > >>   - Thread explosion, 1000 background threads, one per producer, each
> > >>   sending data.
> > >>
> > >> -Jay
> > >>
> > >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> Thanks for the KIP! This is an important feature for Kafka Streams
> and
> > will
> > >>> help to unlock a bunch of use cases.
> > >>>
> > >>> I have some concerns/questions:
> > >>>
> > >>>   1. Producer per task: I'm worried about the overhead this is going
> to
> > >>>   put on both the streams app and the Kafka Brokers. You can easily
> > >>> imagine
> > >>>   an app consuming thousands of partitions. What load will this put
> on
> > the
> > >>>   brokers? Am i correct in assuming that there will be metadata
> > requests
> > >>> per
> > >>>   Producer? The memory overhead in the streams app will also increase
> > >>> fairly
> > >>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
> CONFIG?
> > >>>   2. State Store recovery: As we already know, restoring the entire
> > >>>   changelog can take an extremely long time. Even with a fairly small
> > >>> dataset
> > >>>   and an inappropriately tuned segment size, this can take way too
> > long.
> > >>> My
> > >>>   concern is that failures happen and then recovery takes "forever"
> > and we
> > >>>   end up in a situation where we need to change the max.poll.interval
> > to
> > >>> be
> > >>>   some very large number or else we end up in "rebalance hell". I
> don't
> > >>> think
> > >>>   this provides a very good user experience. You mention RocksDB
> > >>>   checkpointing in the doc - should we explore this idea some more?
> > i.e.,
> > >>>   understand the penalty for checkpointing. Maybe checkpoint every
> *n*
> > >>>    commits?
> > >>>   3. What does EoS mean for Caching? If we set the commit interval to
> > >>>   100ms then the cache is not going to be very effective. Should it
> > just
> > >>> be
> > >>>   disabled?
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com>
> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> > >>> provide
> > >>>> exactly-once processing semantics:
> > >>>>
> > >>>>
> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> 129%3A+Streams+Exactly-Once+Semantics
> > >>>>
> > >>>> This KIP enables Streams users to optionally turn on exactly-once
> > >>>> processing semantics without changing their app code at all by
> > leveraging
> > >>>> the transactional messaging features provided in KIP-98.
> > >>>>
> > >>>> The above wiki page provides a high-level view of the proposed
> > changes,
> > >>>> while detailed implementation design can be found in this Google
> doc:
> > >>>>
> > >>>>
> > >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> > >>> FK1DAB8_gBYA2c
> > >>>>
> > >>>> We would love to hear your comments and suggestions.
> > >>>>
> > >>>> Thanks,
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >
> >
> >
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Damian Guy <da...@gmail.com>.
Hi Matthias,

Thanks. The perf test is a good start but I don't think it goes far enough.
100 partitions is not a lot. What happens when there are thousands of
partitions? What is the load on the brokers? How much more memory is used
by the Streams App etc?

Thanks,
Damian

On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <ma...@confluent.io> wrote:

> Hi,
>
> I want to give a first respond:
>
>
>
> 1. Producer per task:
>
> First, we did some performance tests, indicating that the performance
> penalty is small. Please have a look here:
>
> https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>
> For the test, we ran with a trunk version and a modified version that
> uses a producer per task (of course, no transactions, but at-least-once
> semantics). The scaling factor indicates the number of brokers and
> (single threaded) Streams instances. We used SimpleBenchmark that is
> part of AK code base.
>
>
> Second, as the design is "producer per task" (and not "producer per
> partition") it is possible to specify a custom PartitionGrouper that
> assigns multiple partitions to a single task. Thus, it allows to reduce
> the number of tasks for scenarios with many partitions. Right now, this
> interface must be implemented solely by the user, but we could also add
> a new config parameter that specifies the max.number.of.tasks or
> partitions.per.task so that the user can configure this instead of
> implementing the interface.
>
> Third, there is the idea of a "Producer Pool" that would allow to share
> resources (network connections, memory, etc) over multiple producers.
> This would allow to separate multiple transaction on the producer level,
> while resources are shared. There is no detailed design document yet and
> there would be a KIP for this feature.
>
> Thus, if there should be any performance problems for high scale
> scenarios, there are multiple ways to tackle them while keeping the
> "producer per task" design.
>
> Additionally, a "producer per thread" design would be way more complex
> and I summarized the issues in a separate document. I will share a link
> to the document soon.
>
>
>
> 2. StateStore recovery:
>
> Streams EoS will in the first design not allow to exploit the
> improvements that are added for 0.11 at the moment. However, as 0.10.2
> faces the same issues of potentially long recovery, there is no
> regression with this regard. Thus, I see those improvements as
> orthogonal or add-ons. Nevertheless, we should try to explore those
> options and if possible get them into 0.11 such that Streams with EoS
> gets the same improvements as at-least-once scenario.
>
>
>
> 3. Caching:
>
> We might need to do some experiments to quantify the impact on caching.
> If it's severe, the suggested default commit interval of 100ms could
> also be increased. Also, EoS will not enforce any commit interval, but
> only change the default value. Thus, a user can freely trade-off latency
> vs. caching-effect.
>
> Last but not least, there is the idea to allow "read_uncommitted" for
> intermediate topic. This would be an advance design for Streams EoS that
> allows downstream sub-topologies to read uncommitted data
> optimistically. In case of failure, a cascading abort of transactions
> would be required. This change will need another KIP.
>
>
>
> 4. Idempotent Producer:
>
> The transactional part automatically leverages the idempotent properties
> of the producer. Idempotency is a requirement:
>
> > Note that enable.idempotence must be enabled if a TransactionalId is
> configured.
>
> See
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>
> All idempotent retries, are handled by the producer internally (with or
> without transaction) if enable.idempotence is set to true.
>
>
>
> -Matthias
>
>
>
> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > Another question:
> >
> > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> from KIP-98. It seems that only the transactional part is needed. Or is the
> idempotence guarantee working behind the scenes and helping for some
> scenarios for which it is not worthwhile aborting a transaction (e.g.,
> retransmitting a record after a temporary network glitch)?
> >
> > Thanks
> > Eno
> >
> >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
> >>
> >> I second the concern on with the one producer per task approach. At a
> >> high-level it seems to make sense but I think Damian is exactly right
> that
> >> that cuts against the general design of the producer. Many people have
> high
> >> input partition counts and will have high task counts as a result. I
> think
> >> processing 1000 partitions should not be an unreasonable thing to want
> to
> >> do.
> >>
> >> The tricky bits will be:
> >>
> >>   - Reduced effectiveness of batching (or more latency and memory to get
> >>   equivalent batching). This doesn't show up in simple benchmarks
> because
> >>   much of the penalty is I/O and CPU on the broker and the additional
> threads
> >>   from all the producers can make a single-threaded benchmark seem
> faster.
> >>   - TCP connection explosion. We maintain one connection per broker.
> This
> >>   is already high since each app instance does this. This design though
> will
> >>   add an additional multiplicative factor based on the partition count
> of the
> >>   input.
> >>   - Connection and metadata request storms. When an instance with 1000
> >>   tasks starts up it is going to try to create many thousands of
> connections
> >>   and issue a thousand metadata requests all at once.
> >>   - Memory usage. We currently default to 64MB per producer. This can be
> >>   tuned down, but the fact that we are spreading the batching over more
> >>   producers will fundamentally mean we need a lot more memory to get
> good
> >>   perf and the memory usage will change as your task assignment changes
> so it
> >>   will be hard to set correctly unless it is done automatically.
> >>   - Metrics explosion (1000 producer instances, each with their own
> >>   metrics to monitor).
> >>   - Thread explosion, 1000 background threads, one per producer, each
> >>   sending data.
> >>
> >> -Jay
> >>
> >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com>
> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thanks for the KIP! This is an important feature for Kafka Streams and
> will
> >>> help to unlock a bunch of use cases.
> >>>
> >>> I have some concerns/questions:
> >>>
> >>>   1. Producer per task: I'm worried about the overhead this is going to
> >>>   put on both the streams app and the Kafka Brokers. You can easily
> >>> imagine
> >>>   an app consuming thousands of partitions. What load will this put on
> the
> >>>   brokers? Am i correct in assuming that there will be metadata
> requests
> >>> per
> >>>   Producer? The memory overhead in the streams app will also increase
> >>> fairly
> >>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
> >>>   2. State Store recovery: As we already know, restoring the entire
> >>>   changelog can take an extremely long time. Even with a fairly small
> >>> dataset
> >>>   and an inappropriately tuned segment size, this can take way too
> long.
> >>> My
> >>>   concern is that failures happen and then recovery takes "forever"
> and we
> >>>   end up in a situation where we need to change the max.poll.interval
> to
> >>> be
> >>>   some very large number or else we end up in "rebalance hell". I don't
> >>> think
> >>>   this provides a very good user experience. You mention RocksDB
> >>>   checkpointing in the doc - should we explore this idea some more?
> i.e.,
> >>>   understand the penalty for checkpointing. Maybe checkpoint every *n*
> >>>    commits?
> >>>   3. What does EoS mean for Caching? If we set the commit interval to
> >>>   100ms then the cache is not going to be very effective. Should it
> just
> >>> be
> >>>   disabled?
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> >>> provide
> >>>> exactly-once processing semantics:
> >>>>
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 129%3A+Streams+Exactly-Once+Semantics
> >>>>
> >>>> This KIP enables Streams users to optionally turn on exactly-once
> >>>> processing semantics without changing their app code at all by
> leveraging
> >>>> the transactional messaging features provided in KIP-98.
> >>>>
> >>>> The above wiki page provides a high-level view of the proposed
> changes,
> >>>> while detailed implementation design can be found in this Google doc:
> >>>>
> >>>>
> >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> >>> FK1DAB8_gBYA2c
> >>>>
> >>>> We would love to hear your comments and suggestions.
> >>>>
> >>>> Thanks,
> >>>> -- Guozhang
> >>>>
> >>>
> >
>
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I want to give a first respond:



1. Producer per task:

First, we did some performance tests, indicating that the performance
penalty is small. Please have a look here:
https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing

For the test, we ran with a trunk version and a modified version that
uses a producer per task (of course, no transactions, but at-least-once
semantics). The scaling factor indicates the number of brokers and
(single threaded) Streams instances. We used SimpleBenchmark that is
part of AK code base.


Second, as the design is "producer per task" (and not "producer per
partition") it is possible to specify a custom PartitionGrouper that
assigns multiple partitions to a single task. Thus, it allows to reduce
the number of tasks for scenarios with many partitions. Right now, this
interface must be implemented solely by the user, but we could also add
a new config parameter that specifies the max.number.of.tasks or
partitions.per.task so that the user can configure this instead of
implementing the interface.

Third, there is the idea of a "Producer Pool" that would allow to share
resources (network connections, memory, etc) over multiple producers.
This would allow to separate multiple transaction on the producer level,
while resources are shared. There is no detailed design document yet and
there would be a KIP for this feature.

Thus, if there should be any performance problems for high scale
scenarios, there are multiple ways to tackle them while keeping the
"producer per task" design.

Additionally, a "producer per thread" design would be way more complex
and I summarized the issues in a separate document. I will share a link
to the document soon.



2. StateStore recovery:

Streams EoS will in the first design not allow to exploit the
improvements that are added for 0.11 at the moment. However, as 0.10.2
faces the same issues of potentially long recovery, there is no
regression with this regard. Thus, I see those improvements as
orthogonal or add-ons. Nevertheless, we should try to explore those
options and if possible get them into 0.11 such that Streams with EoS
gets the same improvements as at-least-once scenario.



3. Caching:

We might need to do some experiments to quantify the impact on caching.
If it's severe, the suggested default commit interval of 100ms could
also be increased. Also, EoS will not enforce any commit interval, but
only change the default value. Thus, a user can freely trade-off latency
vs. caching-effect.

Last but not least, there is the idea to allow "read_uncommitted" for
intermediate topic. This would be an advance design for Streams EoS that
allows downstream sub-topologies to read uncommitted data
optimistically. In case of failure, a cascading abort of transactions
would be required. This change will need another KIP.



4. Idempotent Producer:

The transactional part automatically leverages the idempotent properties
of the producer. Idempotency is a requirement:

> Note that enable.idempotence must be enabled if a TransactionalId is configured.

See
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh

All idempotent retries, are handled by the producer internally (with or
without transaction) if enable.idempotence is set to true.



-Matthias



On 3/3/17 3:34 AM, Eno Thereska wrote:
> Another question: 
> 
> The KIP doesn’t exactly spell out how it uses the idempotence guarantee from KIP-98. It seems that only the transactional part is needed. Or is the idempotence guarantee working behind the scenes and helping for some scenarios for which it is not worthwhile aborting a transaction (e.g., retransmitting a record after a temporary network glitch)?
> 
> Thanks
> Eno
> 
>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
>>
>> I second the concern on with the one producer per task approach. At a
>> high-level it seems to make sense but I think Damian is exactly right that
>> that cuts against the general design of the producer. Many people have high
>> input partition counts and will have high task counts as a result. I think
>> processing 1000 partitions should not be an unreasonable thing to want to
>> do.
>>
>> The tricky bits will be:
>>
>>   - Reduced effectiveness of batching (or more latency and memory to get
>>   equivalent batching). This doesn't show up in simple benchmarks because
>>   much of the penalty is I/O and CPU on the broker and the additional threads
>>   from all the producers can make a single-threaded benchmark seem faster.
>>   - TCP connection explosion. We maintain one connection per broker. This
>>   is already high since each app instance does this. This design though will
>>   add an additional multiplicative factor based on the partition count of the
>>   input.
>>   - Connection and metadata request storms. When an instance with 1000
>>   tasks starts up it is going to try to create many thousands of connections
>>   and issue a thousand metadata requests all at once.
>>   - Memory usage. We currently default to 64MB per producer. This can be
>>   tuned down, but the fact that we are spreading the batching over more
>>   producers will fundamentally mean we need a lot more memory to get good
>>   perf and the memory usage will change as your task assignment changes so it
>>   will be hard to set correctly unless it is done automatically.
>>   - Metrics explosion (1000 producer instances, each with their own
>>   metrics to monitor).
>>   - Thread explosion, 1000 background threads, one per producer, each
>>   sending data.
>>
>> -Jay
>>
>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com> wrote:
>>
>>> Hi Guozhang,
>>>
>>> Thanks for the KIP! This is an important feature for Kafka Streams and will
>>> help to unlock a bunch of use cases.
>>>
>>> I have some concerns/questions:
>>>
>>>   1. Producer per task: I'm worried about the overhead this is going to
>>>   put on both the streams app and the Kafka Brokers. You can easily
>>> imagine
>>>   an app consuming thousands of partitions. What load will this put on the
>>>   brokers? Am i correct in assuming that there will be metadata requests
>>> per
>>>   Producer? The memory overhead in the streams app will also increase
>>> fairly
>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
>>>   2. State Store recovery: As we already know, restoring the entire
>>>   changelog can take an extremely long time. Even with a fairly small
>>> dataset
>>>   and an inappropriately tuned segment size, this can take way too long.
>>> My
>>>   concern is that failures happen and then recovery takes "forever" and we
>>>   end up in a situation where we need to change the max.poll.interval to
>>> be
>>>   some very large number or else we end up in "rebalance hell". I don't
>>> think
>>>   this provides a very good user experience. You mention RocksDB
>>>   checkpointing in the doc - should we explore this idea some more? i.e.,
>>>   understand the penalty for checkpointing. Maybe checkpoint every *n*
>>>    commits?
>>>   3. What does EoS mean for Caching? If we set the commit interval to
>>>   100ms then the cache is not going to be very effective. Should it just
>>> be
>>>   disabled?
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>> provide
>>>> exactly-once processing semantics:
>>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 129%3A+Streams+Exactly-Once+Semantics
>>>>
>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>> processing semantics without changing their app code at all by leveraging
>>>> the transactional messaging features provided in KIP-98.
>>>>
>>>> The above wiki page provides a high-level view of the proposed changes,
>>>> while detailed implementation design can be found in this Google doc:
>>>>
>>>>
>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>> FK1DAB8_gBYA2c
>>>>
>>>> We would love to hear your comments and suggestions.
>>>>
>>>> Thanks,
>>>> -- Guozhang
>>>>
>>>
> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Eno Thereska <en...@gmail.com>.
Another question: 

The KIP doesn’t exactly spell out how it uses the idempotence guarantee from KIP-98. It seems that only the transactional part is needed. Or is the idempotence guarantee working behind the scenes and helping for some scenarios for which it is not worthwhile aborting a transaction (e.g., retransmitting a record after a temporary network glitch)?

Thanks
Eno

> On Mar 2, 2017, at 4:56 PM, Jay Kreps <ja...@confluent.io> wrote:
> 
> I second the concern on with the one producer per task approach. At a
> high-level it seems to make sense but I think Damian is exactly right that
> that cuts against the general design of the producer. Many people have high
> input partition counts and will have high task counts as a result. I think
> processing 1000 partitions should not be an unreasonable thing to want to
> do.
> 
> The tricky bits will be:
> 
>   - Reduced effectiveness of batching (or more latency and memory to get
>   equivalent batching). This doesn't show up in simple benchmarks because
>   much of the penalty is I/O and CPU on the broker and the additional threads
>   from all the producers can make a single-threaded benchmark seem faster.
>   - TCP connection explosion. We maintain one connection per broker. This
>   is already high since each app instance does this. This design though will
>   add an additional multiplicative factor based on the partition count of the
>   input.
>   - Connection and metadata request storms. When an instance with 1000
>   tasks starts up it is going to try to create many thousands of connections
>   and issue a thousand metadata requests all at once.
>   - Memory usage. We currently default to 64MB per producer. This can be
>   tuned down, but the fact that we are spreading the batching over more
>   producers will fundamentally mean we need a lot more memory to get good
>   perf and the memory usage will change as your task assignment changes so it
>   will be hard to set correctly unless it is done automatically.
>   - Metrics explosion (1000 producer instances, each with their own
>   metrics to monitor).
>   - Thread explosion, 1000 background threads, one per producer, each
>   sending data.
> 
> -Jay
> 
> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com> wrote:
> 
>> Hi Guozhang,
>> 
>> Thanks for the KIP! This is an important feature for Kafka Streams and will
>> help to unlock a bunch of use cases.
>> 
>> I have some concerns/questions:
>> 
>>   1. Producer per task: I'm worried about the overhead this is going to
>>   put on both the streams app and the Kafka Brokers. You can easily
>> imagine
>>   an app consuming thousands of partitions. What load will this put on the
>>   brokers? Am i correct in assuming that there will be metadata requests
>> per
>>   Producer? The memory overhead in the streams app will also increase
>> fairly
>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
>>   2. State Store recovery: As we already know, restoring the entire
>>   changelog can take an extremely long time. Even with a fairly small
>> dataset
>>   and an inappropriately tuned segment size, this can take way too long.
>> My
>>   concern is that failures happen and then recovery takes "forever" and we
>>   end up in a situation where we need to change the max.poll.interval to
>> be
>>   some very large number or else we end up in "rebalance hell". I don't
>> think
>>   this provides a very good user experience. You mention RocksDB
>>   checkpointing in the doc - should we explore this idea some more? i.e.,
>>   understand the penalty for checkpointing. Maybe checkpoint every *n*
>>    commits?
>>   3. What does EoS mean for Caching? If we set the commit interval to
>>   100ms then the cache is not going to be very effective. Should it just
>> be
>>   disabled?
>> 
>> Thanks,
>> Damian
>> 
>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com> wrote:
>> 
>>> Hi all,
>>> 
>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>> provide
>>> exactly-once processing semantics:
>>> 
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 129%3A+Streams+Exactly-Once+Semantics
>>> 
>>> This KIP enables Streams users to optionally turn on exactly-once
>>> processing semantics without changing their app code at all by leveraging
>>> the transactional messaging features provided in KIP-98.
>>> 
>>> The above wiki page provides a high-level view of the proposed changes,
>>> while detailed implementation design can be found in this Google doc:
>>> 
>>> 
>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>> FK1DAB8_gBYA2c
>>> 
>>> We would love to hear your comments and suggestions.
>>> 
>>> Thanks,
>>> -- Guozhang
>>> 
>> 


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Jay Kreps <ja...@confluent.io>.
I second the concern on with the one producer per task approach. At a
high-level it seems to make sense but I think Damian is exactly right that
that cuts against the general design of the producer. Many people have high
input partition counts and will have high task counts as a result. I think
processing 1000 partitions should not be an unreasonable thing to want to
do.

The tricky bits will be:

   - Reduced effectiveness of batching (or more latency and memory to get
   equivalent batching). This doesn't show up in simple benchmarks because
   much of the penalty is I/O and CPU on the broker and the additional threads
   from all the producers can make a single-threaded benchmark seem faster.
   - TCP connection explosion. We maintain one connection per broker. This
   is already high since each app instance does this. This design though will
   add an additional multiplicative factor based on the partition count of the
   input.
   - Connection and metadata request storms. When an instance with 1000
   tasks starts up it is going to try to create many thousands of connections
   and issue a thousand metadata requests all at once.
   - Memory usage. We currently default to 64MB per producer. This can be
   tuned down, but the fact that we are spreading the batching over more
   producers will fundamentally mean we need a lot more memory to get good
   perf and the memory usage will change as your task assignment changes so it
   will be hard to set correctly unless it is done automatically.
   - Metrics explosion (1000 producer instances, each with their own
   metrics to monitor).
   - Thread explosion, 1000 background threads, one per producer, each
   sending data.

-Jay

On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks for the KIP! This is an important feature for Kafka Streams and will
> help to unlock a bunch of use cases.
>
> I have some concerns/questions:
>
>    1. Producer per task: I'm worried about the overhead this is going to
>    put on both the streams app and the Kafka Brokers. You can easily
> imagine
>    an app consuming thousands of partitions. What load will this put on the
>    brokers? Am i correct in assuming that there will be metadata requests
> per
>    Producer? The memory overhead in the streams app will also increase
> fairly
>    significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
>    2. State Store recovery: As we already know, restoring the entire
>    changelog can take an extremely long time. Even with a fairly small
> dataset
>    and an inappropriately tuned segment size, this can take way too long.
> My
>    concern is that failures happen and then recovery takes "forever" and we
>    end up in a situation where we need to change the max.poll.interval to
> be
>    some very large number or else we end up in "rebalance hell". I don't
> think
>    this provides a very good user experience. You mention RocksDB
>    checkpointing in the doc - should we explore this idea some more? i.e.,
>    understand the penalty for checkpointing. Maybe checkpoint every *n*
>     commits?
>    3. What does EoS mean for Caching? If we set the commit interval to
>    100ms then the cache is not going to be very effective. Should it just
> be
>    disabled?
>
> Thanks,
> Damian
>
> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi all,
> >
> > I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> provide
> > exactly-once processing semantics:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 129%3A+Streams+Exactly-Once+Semantics
> >
> > This KIP enables Streams users to optionally turn on exactly-once
> > processing semantics without changing their app code at all by leveraging
> > the transactional messaging features provided in KIP-98.
> >
> > The above wiki page provides a high-level view of the proposed changes,
> > while detailed implementation design can be found in this Google doc:
> >
> >
> > https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> FK1DAB8_gBYA2c
> >
> > We would love to hear your comments and suggestions.
> >
> > Thanks,
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

Posted by Damian Guy <da...@gmail.com>.
Hi Guozhang,

Thanks for the KIP! This is an important feature for Kafka Streams and will
help to unlock a bunch of use cases.

I have some concerns/questions:

   1. Producer per task: I'm worried about the overhead this is going to
   put on both the streams app and the Kafka Brokers. You can easily imagine
   an app consuming thousands of partitions. What load will this put on the
   brokers? Am i correct in assuming that there will be metadata requests per
   Producer? The memory overhead in the streams app will also increase fairly
   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
   2. State Store recovery: As we already know, restoring the entire
   changelog can take an extremely long time. Even with a fairly small dataset
   and an inappropriately tuned segment size, this can take way too long. My
   concern is that failures happen and then recovery takes "forever" and we
   end up in a situation where we need to change the max.poll.interval to be
   some very large number or else we end up in "rebalance hell". I don't think
   this provides a very good user experience. You mention RocksDB
   checkpointing in the doc - should we explore this idea some more? i.e.,
   understand the penalty for checkpointing. Maybe checkpoint every *n*
    commits?
   3. What does EoS mean for Caching? If we set the commit interval to
   100ms then the cache is not going to be very effective. Should it just be
   disabled?

Thanks,
Damian

On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wa...@gmail.com> wrote:

> Hi all,
>
> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and provide
> exactly-once processing semantics:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
>
> This KIP enables Streams users to optionally turn on exactly-once
> processing semantics without changing their app code at all by leveraging
> the transactional messaging features provided in KIP-98.
>
> The above wiki page provides a high-level view of the proposed changes,
> while detailed implementation design can be found in this Google doc:
>
>
> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c
>
> We would love to hear your comments and suggestions.
>
> Thanks,
> -- Guozhang
>