You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2018/01/16 18:46:04 UTC

Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

> Thus, only left/outer KStream-KStream and KStream-KTable join have some
runtime dependencies. For more details about join, check out this blog
post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

So I am trying to reprocess and topology and seem to have encountered this.
I posted my question to
https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly.
I fear that this will not be something I can work around :(

On Sat, Dec 9, 2017 at 7:52 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> About timestamps: embedding timestamps in the payload itself is not
> really necessary IMHO. Each record has meta-data timestamp that provides
> the exact same semantic. If you just copy data from one topic to
> another, the timestamp can be preserved (using plain consumer/producer
> and setting the timestamp of the input record explicitly as timestamp
> for the output recrod-- for streams, it could be that "some" timestamps
> get altered as we apply slightly different timestamp inference
> logic---but there are plans to improve this and to better inference that
> would preserve the timestamp exactly in Streams, too).
>
> With regard to flow control: it depends on the operators you use. Some
> are fully deterministic, other have some runtime dependencies. Fully
> deterministic are all aggregations (non-windowed and windowed), as well
> as inner KStream-KStream join and all variants (inner/left/outer) of
> KTable-KTable join.
>
> > If the consumer reads P2 before P1, will the task still
> > properly align these two records given their timestamps for the correct
> > inner join, assuming both records within the record buffer?
>
> This will always be computed correctly, even if both records are not in
> the buffer at the same time :)
>
>
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
> runtime dependencies. For more details about join, check out this blog
> post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>
> Btw: we are aware of some weaknesses in the current implementation and I
> it's on our road map to strengthen our guarantees. Also with regard to
> the internally used record buffer, time management in general, as well
> as operator semantics.
>
> Note though: Kafka guarantees offset-based ordering, not
> timestamp-ordering. And thus, also in Kafka Streams we process records
> in offset order. This implies, that records might be out-of-order with
> regard to their timestamps, but our operators are implemented to handle
> this case correctly (minus some know issues as mentioned above that we
> are going to fix in future releases).
>
>
> Stateless: I mean, if you write a program that only uses stateless
> operators like filter/map but not aggregation/joins.
>
>
>
> -Matthias
>
>
> On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
> >> How large is the record buffer? Is it configurable?
> >
> > I seem to have just discovered this answer to this:
> > buffered.records.per.partition
> >
> > On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky <dm...@gmail.com>
> > wrote:
> >
> >> Hi Matthias, yes that definitely helps. A few thoughts inline below.
> >>
> >> Thank you!
> >>
> >> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> Hard to give a generic answer.
> >>>
> >>> 1. We recommend to over-partitions your input topics to start with (to
> >>> avoid that you need to add new partitions later on); problem avoidance
> >>> is the best strategy. There will be some overhead for this obviously on
> >>> the broker side, but it's not too big.
> >>>
> >>
> >> Yes,  I will definitely be doing this.
> >>
> >>
> >>>
> >>> 2. Not sure why you would need a new cluster? You can just create a new
> >>> topic in the same cluster and let Kafka Streams read from there.
> >>>
> >>
> >> Motivated by fear of disturbing/manipulating a production cluster and
> the
> >> relative ease of putting up a new cluster. Perhaps that fear is
> irrational.
> >> I could alternatively just prefix topics.
> >>
> >>
> >>>
> >>> 3. Depending on your state requirements, you could also run two
> >>> applications in parallel -- the new one reads from the new input topic
> >>> with more partitions and you configure your producer to write to the
> new
> >>> topic (or maybe even to dual writes to both). If your new application
> is
> >>> ramped up, you can stop the old one.
> >>>
> >>
> >> Yes, this is my plan for migrations. If I could run it past you:
> >>
> >> (i) Write input topics from the old prefix to the new prefix.
> >> (ii) Start the new Kafka Streams application against the new prefix.
> >> (iii) When the two applications are in sync, stop writing to the old
> >> topics
> >>
> >> Since I will be copying from an old prefix to new prefix, it seems
> >> essential here to have timestamps embedded in the data records along
> with a
> >> custom timestamp extractor.
> >>
> >> I really wish I could get some more flavor on "Flow Control With
> >> Timestamps
> >> <https://docs.confluent.io/current/streams/architecture.
> html#flow-control-with-timestamps>"
> >> in this regard. Assuming my timestamps are monotonically increasing
> within
> >> each input topic, from my reading of that section it still appears that
> the
> >> result of reprocessing input topics is non-deterministic beyond the
> >> "records in its stream record buffer". Some seemingly crucial sentences:
> >>
> >>> *This flow control is best-effort because it is not always possible to
> >> strictly enforce execution order across streams by record timestamp; in
> >> fact, in order to enforce strict execution ordering, one must either
> wait
> >> until the system has received all the records from all streams (which
> may
> >> be quite infeasible in practice) or inject additional information about
> >> timestamp boundaries or heuristic estimates such as MillWheel’s
> watermarks.*
> >>
> >>
> >> Practically, how am I to understand this? How large is the record
> buffer?
> >> Is it configurable?
> >>
> >> For example, suppose I am re-processing an inner join on partitions P1
> >> (left) and P2 (right). In the original processing, record K1V1T1 was
> >> recorded onto P1, then some time laster record K1V2T2 was recorded onto
> P2.
> >> As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing,
> P1
> >> and P2 contain historical data and the Kafka Streams consumers can read
> P2
> >> before P1. If the consumer reads P2 before P1, will the task still
> >> properly align these two records given their timestamps for the correct
> >> inner join, assuming both records within the record buffer? I've
> >> experimented with this, but unfortunately I didn't have time to really
> set
> >> up good experiments to satisfy myself.
> >>
> >>
> >>> 4. If you really need to add new partitions, you need to fix up all
> >>> topics manually -- including all topics Kafka Streams created for you.
> >>> Adding partitions messes up all your state shared as key-based
> >>> partitioning changes. This implies that you application must be
> stopped!
> >>> Thus, if you have zero downtime requirements you can't do this at all.
> >>>
> >>> 5. If you have a stateless application all those issues go away though
> >>> and you can even add new partitions during runtime.
> >>>
> >>>
> >> Stateless in what sense? Kafka Streams seems to be all about aligning
> and
> >> manipulating state to create more state. Are you referring to internal
> >> state, specifically?
> >>
> >>
> >>
> >>>
> >>> Hope this helps.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
> >>>> I am about to put a topology into production and I am concerned that I
> >>>> don't know how to repartition/rebalance the topics in the event that I
> >>> need
> >>>> to add more partitions.
> >>>>
> >>>> My inclination is that I should spin up a new cluster and run some
> kind
> >>> of
> >>>> consumer/producer combination that takes data from the previous
> cluster
> >>> and
> >>>> writes it to the new cluster. A new instance of the Kafka Streams
> >>>> application then works against this new cluster. But I'm not sure how
> to
> >>>> best execute this, or whether this approach is sound at all. I am
> >>> imagining
> >>>> many things may go wrong. Without going into further speculation, what
> >>> is
> >>>> the best way to do this?
> >>>>
> >>>> Thank you,
> >>>> Dmitry
> >>>>
> >>>
> >>>
> >>
> >
>
>