You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jeff Klukas <jk...@simple.com> on 2016/04/19 22:40:06 UTC

How to "buffer" a stream with high churn and output only at the end of a window?

Is it true that the aggregation and reduction methods of KStream will emit
a new output message for each incoming message?

I have an application that's copying a Postgres replication stream to a
Kafka topic, and activity tends to be clustered, with many updates to a
given primary key happening in quick succession. I'd like to smooth that
out by buffering the messages in tumbling windows, allowing the updates to
overwrite one another, and emitting output messages only at the end of the
window.

Does the Kafka Streams API provide any hooks that I could use to achieve
this kind of windowed "buffering" or "deduplication" of a stream?

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
Henry,

Thanks for the great feedbacks. I'm making some proposal for adding the
control mechanism for latency v.s. data volume tradeoffs, which I will put
up to wiki once it is done:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions

We can continue the discussion from there. In the end we need to proposal a
KIP for this feature.


Guozhang


On Wed, Apr 20, 2016 at 10:57 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> My use case is actually myTable.aggregate().to("output_topic"), so I need a
> way to suppress the number of outputs.
>
> I don't think correlating the internal cache flush with the output window
> emit frequency is ideal.  It's too hard for application developer to see
> when the cache will be flushed, we would like to see a clearly defined
> window emit policy.  I think emit on window end (or plus a application
> defined delay) is very easy to understand, it's OK to re-emit when we have
> late-arrival events since this doesn't happen that often (if we use google
> data flow concept, the window end is based on user defined watermark
> function which should already add the buffer for most common processing
> delays).
>
> Another problem with cache flush is it actually flushed quite often, e.g.
> since the in-memory cache doesn't support rangeScan and most of the lookup
> on window-based store needs to do range scan which would trigger the flush
> first.
>
> On Wed, Apr 20, 2016 at 9:13 PM, Jay Kreps <ja...@confluent.io> wrote:
>
> > To summarize a chat session Guozhang and I just had on slack:
> >
> > We currently do dedupe the output for stateful operations (e.g. join,
> > aggregate). They hit an in-memory cache and only produce output to
> > rocksdb/kafka when either that cache fills up or the commit period
> occurs.
> > So the changelog for these operations which is often also the output
> > already gets this deduplication. Controlling the commit frequency and
> cache
> > size is probably the right way to trade off latency of the update vs
> update
> > volume.
> >
> > The operation we don't dedupe is to()/through(). So, say if you do an
> > operation like
> >    myTable.aggregate().filter().map().to("output_topic")
> > Here the aggregation itself (and hence its changelog) isn't the intended
> > output, but rather some transformed version of it. In this case the issue
> > you describe is correct, we don't dedupe. There might be several options
> > here. One would be for the aggregate to produce deduped output lazily.
> The
> > other would be for the to() operator to also dedupe.
> >
> > Personally I feel this idea of caching to suppress output versus is
> > actually a better way to model and think about what's going on than
> trying
> > to have a triggering policy. If you set a triggering policy that says
> "only
> > output at the end of the window" the reality is that if late data comes
> you
> > still have to produce additional outputs. So you don't produce one output
> > at the end but rather potentially any number of outputs. So a simple way
> to
> > think about this is that you produce all updates but optimistically
> > suppress some duplicates for efficiency.
> >
> > -Jay
> >
> > On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <hc...@pinterest.com.invalid>
> > wrote:
> >
> > > 0.10.0.1 is fine for me, I am actually building from trunk head for
> > streams
> > > package.
> > >
> > > On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > I saw that note, thanks for commenting.
> > > >
> > > > I are cutting the next 0.10.0.0 RC next week, so I am not certain if
> it
> > > > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai
> <hcai@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Thanks.
> > > > >
> > > > > Do you know when KAFKA-3101 will be implemented?
> > > > >
> > > > > I also add a note to that JIRA for a left outer join use case which
> > > also
> > > > > need buffer support.
> > > > >
> > > > >
> > > > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Henry,
> > > > > >
> > > > > > I thought you were concerned about consumer memory contention.
> > > That's a
> > > > > > valid point, and yes, you need to keep those buffered records in
> a
> > > > > > persistent store.
> > > > > >
> > > > > > As I mentioned we are trying to do optimize the aggregation
> outputs
> > > as
> > > > in
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > >
> > > > > > Its idea is very similar to buffering, while we keep the
> aggregated
> > > > > values
> > > > > > in RocksDB, we do not send the updated values for each receiving
> > > record
> > > > > but
> > > > > > only do that based on some policy. More generally we can have a
> > > trigger
> > > > > > mechanism for user to customize when to emit.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai
> > > <hcai@pinterest.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I think this scheme still has problems.  If during 'holding' I
> > > > > literally
> > > > > > > hold (don't return the method call), I will starve the thread.
> > If
> > > I
> > > > am
> > > > > > > writing the output to a in-memory buffer and let the method
> > > returns,
> > > > > the
> > > > > > > kafka stream will acknowledge the record to upstream queue as
> > > > > processed,
> > > > > > so
> > > > > > > I would lose the record if the node crashed after ack but
> before
> > 10
> > > > > > minutes
> > > > > > > is up.
> > > > > > >
> > > > > > > I guess I need to write the buffered result into a persistent
> > > store,
> > > > > > > another kafka queue or K/V store.
> > > > > > >
> > > > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > By "holding the stream", I assume you are still consuming
> data,
> > > but
> > > > > > just
> > > > > > > > that you only write data every 10 minutes instead of upon
> each
> > > > > received
> > > > > > > > record right?
> > > > > > > >
> > > > > > > > Anyways, in either case, consumer should not have severe
> memory
> > > > issue
> > > > > > as
> > > > > > > > Kafka Streams will pause its consuming when enough data is
> > > buffered
> > > > > at
> > > > > > > the
> > > > > > > > streams end (note that we have two buffers here, the consumer
> > > > buffers
> > > > > > raw
> > > > > > > > bytes, and the streams library take raw bytes and buffer the
> > > > > > > de-serialized
> > > > > > > > objects, and threshold on its own buffer to pause / resume
> the
> > > > > > consumer).
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> > > > > <hcai@pinterest.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > So hold the stream for 15 minutes wouldn't cause too much
> > > > > performance
> > > > > > > > > problems?
> > > > > > > > >
> > > > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <
> > > > wangguoz@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Consumer' buffer does not depend on offset committing,
> once
> > > it
> > > > is
> > > > > > > given
> > > > > > > > > > from the poll() call it is out of the buffer. If offsets
> > are
> > > > not
> > > > > > > > > committed,
> > > > > > > > > > then upon failover it will simply re-consumer these
> records
> > > > again
> > > > > > > from
> > > > > > > > > > Kafka.
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > > > > > <hcai@pinterest.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > For the technique of custom Processor of holding call
> to
> > > > > > > > > > context.forward(),
> > > > > > > > > > > if I hold it for 10 minutes, what does that mean for
> the
> > > > > consumer
> > > > > > > > > > > acknowledgement on source node?
> > > > > > > > > > >
> > > > > > > > > > > I guess if I hold it for 10 minutes, the consumer is
> not
> > > > going
> > > > > to
> > > > > > > ack
> > > > > > > > > to
> > > > > > > > > > > the upstream queue, will that impact the consumer
> > > > performance,
> > > > > > will
> > > > > > > > > > > consumer's kafka client message buffer overflow when
> > there
> > > is
> > > > > no
> > > > > > > ack
> > > > > > > > in
> > > > > > > > > > 10
> > > > > > > > > > > minutes?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > > > > > wangguoz@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Yes we are aware of this behavior and are working on
> > > > > optimizing
> > > > > > > it:
> > > > > > > > > > > >
> > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > > > > > >
> > > > > > > > > > > > More generally, we are considering to add a "trigger"
> > > > > interface
> > > > > > > > > similar
> > > > > > > > > > > to
> > > > > > > > > > > > the Millwheel model where users can customize when
> they
> > > > want
> > > > > to
> > > > > > > > emit
> > > > > > > > > > > > outputs to the downstream operators. Unfortunately
> for
> > > now
> > > > > > there
> > > > > > > > will
> > > > > > > > > > no
> > > > > > > > > > > > easy workaround for buffering, and you may want to do
> > > this
> > > > in
> > > > > > app
> > > > > > > > > code
> > > > > > > > > > > (for
> > > > > > > > > > > > example, in a customized Processor where you can
> > control
> > > > when
> > > > > > to
> > > > > > > > call
> > > > > > > > > > > > context.forward() ).
> > > > > > > > > > > >
> > > > > > > > > > > > Guozhang
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > > > > > jklukas@simple.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Is it true that the aggregation and reduction
> methods
> > > of
> > > > > > > KStream
> > > > > > > > > will
> > > > > > > > > > > > emit
> > > > > > > > > > > > > a new output message for each incoming message?
> > > > > > > > > > > > >
> > > > > > > > > > > > > I have an application that's copying a Postgres
> > > > replication
> > > > > > > > stream
> > > > > > > > > > to a
> > > > > > > > > > > > > Kafka topic, and activity tends to be clustered,
> with
> > > > many
> > > > > > > > updates
> > > > > > > > > > to a
> > > > > > > > > > > > > given primary key happening in quick succession.
> I'd
> > > like
> > > > > to
> > > > > > > > smooth
> > > > > > > > > > > that
> > > > > > > > > > > > > out by buffering the messages in tumbling windows,
> > > > allowing
> > > > > > the
> > > > > > > > > > updates
> > > > > > > > > > > > to
> > > > > > > > > > > > > overwrite one another, and emitting output messages
> > > only
> > > > at
> > > > > > the
> > > > > > > > end
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > window.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Does the Kafka Streams API provide any hooks that I
> > > could
> > > > > use
> > > > > > > to
> > > > > > > > > > > achieve
> > > > > > > > > > > > > this kind of windowed "buffering" or
> "deduplication"
> > > of a
> > > > > > > stream?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
My use case is actually myTable.aggregate().to("output_topic"), so I need a
way to suppress the number of outputs.

I don't think correlating the internal cache flush with the output window
emit frequency is ideal.  It's too hard for application developer to see
when the cache will be flushed, we would like to see a clearly defined
window emit policy.  I think emit on window end (or plus a application
defined delay) is very easy to understand, it's OK to re-emit when we have
late-arrival events since this doesn't happen that often (if we use google
data flow concept, the window end is based on user defined watermark
function which should already add the buffer for most common processing
delays).

Another problem with cache flush is it actually flushed quite often, e.g.
since the in-memory cache doesn't support rangeScan and most of the lookup
on window-based store needs to do range scan which would trigger the flush
first.

On Wed, Apr 20, 2016 at 9:13 PM, Jay Kreps <ja...@confluent.io> wrote:

> To summarize a chat session Guozhang and I just had on slack:
>
> We currently do dedupe the output for stateful operations (e.g. join,
> aggregate). They hit an in-memory cache and only produce output to
> rocksdb/kafka when either that cache fills up or the commit period occurs.
> So the changelog for these operations which is often also the output
> already gets this deduplication. Controlling the commit frequency and cache
> size is probably the right way to trade off latency of the update vs update
> volume.
>
> The operation we don't dedupe is to()/through(). So, say if you do an
> operation like
>    myTable.aggregate().filter().map().to("output_topic")
> Here the aggregation itself (and hence its changelog) isn't the intended
> output, but rather some transformed version of it. In this case the issue
> you describe is correct, we don't dedupe. There might be several options
> here. One would be for the aggregate to produce deduped output lazily. The
> other would be for the to() operator to also dedupe.
>
> Personally I feel this idea of caching to suppress output versus is
> actually a better way to model and think about what's going on than trying
> to have a triggering policy. If you set a triggering policy that says "only
> output at the end of the window" the reality is that if late data comes you
> still have to produce additional outputs. So you don't produce one output
> at the end but rather potentially any number of outputs. So a simple way to
> think about this is that you produce all updates but optimistically
> suppress some duplicates for efficiency.
>
> -Jay
>
> On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <hc...@pinterest.com.invalid>
> wrote:
>
> > 0.10.0.1 is fine for me, I am actually building from trunk head for
> streams
> > package.
> >
> > On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > I saw that note, thanks for commenting.
> > >
> > > I are cutting the next 0.10.0.0 RC next week, so I am not certain if it
> > > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai <hcai@pinterest.com.invalid
> >
> > > wrote:
> > >
> > > > Thanks.
> > > >
> > > > Do you know when KAFKA-3101 will be implemented?
> > > >
> > > > I also add a note to that JIRA for a left outer join use case which
> > also
> > > > need buffer support.
> > > >
> > > >
> > > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Henry,
> > > > >
> > > > > I thought you were concerned about consumer memory contention.
> > That's a
> > > > > valid point, and yes, you need to keep those buffered records in a
> > > > > persistent store.
> > > > >
> > > > > As I mentioned we are trying to do optimize the aggregation outputs
> > as
> > > in
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > >
> > > > > Its idea is very similar to buffering, while we keep the aggregated
> > > > values
> > > > > in RocksDB, we do not send the updated values for each receiving
> > record
> > > > but
> > > > > only do that based on some policy. More generally we can have a
> > trigger
> > > > > mechanism for user to customize when to emit.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai
> > <hcai@pinterest.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > I think this scheme still has problems.  If during 'holding' I
> > > > literally
> > > > > > hold (don't return the method call), I will starve the thread.
> If
> > I
> > > am
> > > > > > writing the output to a in-memory buffer and let the method
> > returns,
> > > > the
> > > > > > kafka stream will acknowledge the record to upstream queue as
> > > > processed,
> > > > > so
> > > > > > I would lose the record if the node crashed after ack but before
> 10
> > > > > minutes
> > > > > > is up.
> > > > > >
> > > > > > I guess I need to write the buffered result into a persistent
> > store,
> > > > > > another kafka queue or K/V store.
> > > > > >
> > > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > By "holding the stream", I assume you are still consuming data,
> > but
> > > > > just
> > > > > > > that you only write data every 10 minutes instead of upon each
> > > > received
> > > > > > > record right?
> > > > > > >
> > > > > > > Anyways, in either case, consumer should not have severe memory
> > > issue
> > > > > as
> > > > > > > Kafka Streams will pause its consuming when enough data is
> > buffered
> > > > at
> > > > > > the
> > > > > > > streams end (note that we have two buffers here, the consumer
> > > buffers
> > > > > raw
> > > > > > > bytes, and the streams library take raw bytes and buffer the
> > > > > > de-serialized
> > > > > > > objects, and threshold on its own buffer to pause / resume the
> > > > > consumer).
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> > > > <hcai@pinterest.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > So hold the stream for 15 minutes wouldn't cause too much
> > > > performance
> > > > > > > > problems?
> > > > > > > >
> > > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Consumer' buffer does not depend on offset committing, once
> > it
> > > is
> > > > > > given
> > > > > > > > > from the poll() call it is out of the buffer. If offsets
> are
> > > not
> > > > > > > > committed,
> > > > > > > > > then upon failover it will simply re-consumer these records
> > > again
> > > > > > from
> > > > > > > > > Kafka.
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > > > > <hcai@pinterest.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > For the technique of custom Processor of holding call to
> > > > > > > > > context.forward(),
> > > > > > > > > > if I hold it for 10 minutes, what does that mean for the
> > > > consumer
> > > > > > > > > > acknowledgement on source node?
> > > > > > > > > >
> > > > > > > > > > I guess if I hold it for 10 minutes, the consumer is not
> > > going
> > > > to
> > > > > > ack
> > > > > > > > to
> > > > > > > > > > the upstream queue, will that impact the consumer
> > > performance,
> > > > > will
> > > > > > > > > > consumer's kafka client message buffer overflow when
> there
> > is
> > > > no
> > > > > > ack
> > > > > > > in
> > > > > > > > > 10
> > > > > > > > > > minutes?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > > > > wangguoz@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Yes we are aware of this behavior and are working on
> > > > optimizing
> > > > > > it:
> > > > > > > > > > >
> > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > > > > >
> > > > > > > > > > > More generally, we are considering to add a "trigger"
> > > > interface
> > > > > > > > similar
> > > > > > > > > > to
> > > > > > > > > > > the Millwheel model where users can customize when they
> > > want
> > > > to
> > > > > > > emit
> > > > > > > > > > > outputs to the downstream operators. Unfortunately for
> > now
> > > > > there
> > > > > > > will
> > > > > > > > > no
> > > > > > > > > > > easy workaround for buffering, and you may want to do
> > this
> > > in
> > > > > app
> > > > > > > > code
> > > > > > > > > > (for
> > > > > > > > > > > example, in a customized Processor where you can
> control
> > > when
> > > > > to
> > > > > > > call
> > > > > > > > > > > context.forward() ).
> > > > > > > > > > >
> > > > > > > > > > > Guozhang
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > > > > jklukas@simple.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Is it true that the aggregation and reduction methods
> > of
> > > > > > KStream
> > > > > > > > will
> > > > > > > > > > > emit
> > > > > > > > > > > > a new output message for each incoming message?
> > > > > > > > > > > >
> > > > > > > > > > > > I have an application that's copying a Postgres
> > > replication
> > > > > > > stream
> > > > > > > > > to a
> > > > > > > > > > > > Kafka topic, and activity tends to be clustered, with
> > > many
> > > > > > > updates
> > > > > > > > > to a
> > > > > > > > > > > > given primary key happening in quick succession. I'd
> > like
> > > > to
> > > > > > > smooth
> > > > > > > > > > that
> > > > > > > > > > > > out by buffering the messages in tumbling windows,
> > > allowing
> > > > > the
> > > > > > > > > updates
> > > > > > > > > > > to
> > > > > > > > > > > > overwrite one another, and emitting output messages
> > only
> > > at
> > > > > the
> > > > > > > end
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > window.
> > > > > > > > > > > >
> > > > > > > > > > > > Does the Kafka Streams API provide any hooks that I
> > could
> > > > use
> > > > > > to
> > > > > > > > > > achieve
> > > > > > > > > > > > this kind of windowed "buffering" or "deduplication"
> > of a
> > > > > > stream?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Jay Kreps <ja...@confluent.io>.
To summarize a chat session Guozhang and I just had on slack:

We currently do dedupe the output for stateful operations (e.g. join,
aggregate). They hit an in-memory cache and only produce output to
rocksdb/kafka when either that cache fills up or the commit period occurs.
So the changelog for these operations which is often also the output
already gets this deduplication. Controlling the commit frequency and cache
size is probably the right way to trade off latency of the update vs update
volume.

The operation we don't dedupe is to()/through(). So, say if you do an
operation like
   myTable.aggregate().filter().map().to("output_topic")
Here the aggregation itself (and hence its changelog) isn't the intended
output, but rather some transformed version of it. In this case the issue
you describe is correct, we don't dedupe. There might be several options
here. One would be for the aggregate to produce deduped output lazily. The
other would be for the to() operator to also dedupe.

Personally I feel this idea of caching to suppress output versus is
actually a better way to model and think about what's going on than trying
to have a triggering policy. If you set a triggering policy that says "only
output at the end of the window" the reality is that if late data comes you
still have to produce additional outputs. So you don't produce one output
at the end but rather potentially any number of outputs. So a simple way to
think about this is that you produce all updates but optimistically
suppress some duplicates for efficiency.

-Jay

On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> 0.10.0.1 is fine for me, I am actually building from trunk head for streams
> package.
>
> On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > I saw that note, thanks for commenting.
> >
> > I are cutting the next 0.10.0.0 RC next week, so I am not certain if it
> > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.
> >
> > Guozhang
> >
> > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai <hc...@pinterest.com.invalid>
> > wrote:
> >
> > > Thanks.
> > >
> > > Do you know when KAFKA-3101 will be implemented?
> > >
> > > I also add a note to that JIRA for a left outer join use case which
> also
> > > need buffer support.
> > >
> > >
> > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Henry,
> > > >
> > > > I thought you were concerned about consumer memory contention.
> That's a
> > > > valid point, and yes, you need to keep those buffered records in a
> > > > persistent store.
> > > >
> > > > As I mentioned we are trying to do optimize the aggregation outputs
> as
> > in
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > >
> > > > Its idea is very similar to buffering, while we keep the aggregated
> > > values
> > > > in RocksDB, we do not send the updated values for each receiving
> record
> > > but
> > > > only do that based on some policy. More generally we can have a
> trigger
> > > > mechanism for user to customize when to emit.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai
> <hcai@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > I think this scheme still has problems.  If during 'holding' I
> > > literally
> > > > > hold (don't return the method call), I will starve the thread.  If
> I
> > am
> > > > > writing the output to a in-memory buffer and let the method
> returns,
> > > the
> > > > > kafka stream will acknowledge the record to upstream queue as
> > > processed,
> > > > so
> > > > > I would lose the record if the node crashed after ack but before 10
> > > > minutes
> > > > > is up.
> > > > >
> > > > > I guess I need to write the buffered result into a persistent
> store,
> > > > > another kafka queue or K/V store.
> > > > >
> > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > By "holding the stream", I assume you are still consuming data,
> but
> > > > just
> > > > > > that you only write data every 10 minutes instead of upon each
> > > received
> > > > > > record right?
> > > > > >
> > > > > > Anyways, in either case, consumer should not have severe memory
> > issue
> > > > as
> > > > > > Kafka Streams will pause its consuming when enough data is
> buffered
> > > at
> > > > > the
> > > > > > streams end (note that we have two buffers here, the consumer
> > buffers
> > > > raw
> > > > > > bytes, and the streams library take raw bytes and buffer the
> > > > > de-serialized
> > > > > > objects, and threshold on its own buffer to pause / resume the
> > > > consumer).
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> > > <hcai@pinterest.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > So hold the stream for 15 minutes wouldn't cause too much
> > > performance
> > > > > > > problems?
> > > > > > >
> > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Consumer' buffer does not depend on offset committing, once
> it
> > is
> > > > > given
> > > > > > > > from the poll() call it is out of the buffer. If offsets are
> > not
> > > > > > > committed,
> > > > > > > > then upon failover it will simply re-consumer these records
> > again
> > > > > from
> > > > > > > > Kafka.
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > > > <hcai@pinterest.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > For the technique of custom Processor of holding call to
> > > > > > > > context.forward(),
> > > > > > > > > if I hold it for 10 minutes, what does that mean for the
> > > consumer
> > > > > > > > > acknowledgement on source node?
> > > > > > > > >
> > > > > > > > > I guess if I hold it for 10 minutes, the consumer is not
> > going
> > > to
> > > > > ack
> > > > > > > to
> > > > > > > > > the upstream queue, will that impact the consumer
> > performance,
> > > > will
> > > > > > > > > consumer's kafka client message buffer overflow when there
> is
> > > no
> > > > > ack
> > > > > > in
> > > > > > > > 10
> > > > > > > > > minutes?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > > > wangguoz@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Yes we are aware of this behavior and are working on
> > > optimizing
> > > > > it:
> > > > > > > > > >
> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > > > >
> > > > > > > > > > More generally, we are considering to add a "trigger"
> > > interface
> > > > > > > similar
> > > > > > > > > to
> > > > > > > > > > the Millwheel model where users can customize when they
> > want
> > > to
> > > > > > emit
> > > > > > > > > > outputs to the downstream operators. Unfortunately for
> now
> > > > there
> > > > > > will
> > > > > > > > no
> > > > > > > > > > easy workaround for buffering, and you may want to do
> this
> > in
> > > > app
> > > > > > > code
> > > > > > > > > (for
> > > > > > > > > > example, in a customized Processor where you can control
> > when
> > > > to
> > > > > > call
> > > > > > > > > > context.forward() ).
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > > > jklukas@simple.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Is it true that the aggregation and reduction methods
> of
> > > > > KStream
> > > > > > > will
> > > > > > > > > > emit
> > > > > > > > > > > a new output message for each incoming message?
> > > > > > > > > > >
> > > > > > > > > > > I have an application that's copying a Postgres
> > replication
> > > > > > stream
> > > > > > > > to a
> > > > > > > > > > > Kafka topic, and activity tends to be clustered, with
> > many
> > > > > > updates
> > > > > > > > to a
> > > > > > > > > > > given primary key happening in quick succession. I'd
> like
> > > to
> > > > > > smooth
> > > > > > > > > that
> > > > > > > > > > > out by buffering the messages in tumbling windows,
> > allowing
> > > > the
> > > > > > > > updates
> > > > > > > > > > to
> > > > > > > > > > > overwrite one another, and emitting output messages
> only
> > at
> > > > the
> > > > > > end
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > window.
> > > > > > > > > > >
> > > > > > > > > > > Does the Kafka Streams API provide any hooks that I
> could
> > > use
> > > > > to
> > > > > > > > > achieve
> > > > > > > > > > > this kind of windowed "buffering" or "deduplication"
> of a
> > > > > stream?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
0.10.0.1 is fine for me, I am actually building from trunk head for streams
package.

On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I saw that note, thanks for commenting.
>
> I are cutting the next 0.10.0.0 RC next week, so I am not certain if it
> will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.
>
> Guozhang
>
> On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai <hc...@pinterest.com.invalid>
> wrote:
>
> > Thanks.
> >
> > Do you know when KAFKA-3101 will be implemented?
> >
> > I also add a note to that JIRA for a left outer join use case which also
> > need buffer support.
> >
> >
> > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Henry,
> > >
> > > I thought you were concerned about consumer memory contention. That's a
> > > valid point, and yes, you need to keep those buffered records in a
> > > persistent store.
> > >
> > > As I mentioned we are trying to do optimize the aggregation outputs as
> in
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-3101
> > >
> > > Its idea is very similar to buffering, while we keep the aggregated
> > values
> > > in RocksDB, we do not send the updated values for each receiving record
> > but
> > > only do that based on some policy. More generally we can have a trigger
> > > mechanism for user to customize when to emit.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai <hcai@pinterest.com.invalid
> >
> > > wrote:
> > >
> > > > I think this scheme still has problems.  If during 'holding' I
> > literally
> > > > hold (don't return the method call), I will starve the thread.  If I
> am
> > > > writing the output to a in-memory buffer and let the method returns,
> > the
> > > > kafka stream will acknowledge the record to upstream queue as
> > processed,
> > > so
> > > > I would lose the record if the node crashed after ack but before 10
> > > minutes
> > > > is up.
> > > >
> > > > I guess I need to write the buffered result into a persistent store,
> > > > another kafka queue or K/V store.
> > > >
> > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > By "holding the stream", I assume you are still consuming data, but
> > > just
> > > > > that you only write data every 10 minutes instead of upon each
> > received
> > > > > record right?
> > > > >
> > > > > Anyways, in either case, consumer should not have severe memory
> issue
> > > as
> > > > > Kafka Streams will pause its consuming when enough data is buffered
> > at
> > > > the
> > > > > streams end (note that we have two buffers here, the consumer
> buffers
> > > raw
> > > > > bytes, and the streams library take raw bytes and buffer the
> > > > de-serialized
> > > > > objects, and threshold on its own buffer to pause / resume the
> > > consumer).
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> > <hcai@pinterest.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > So hold the stream for 15 minutes wouldn't cause too much
> > performance
> > > > > > problems?
> > > > > >
> > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Consumer' buffer does not depend on offset committing, once it
> is
> > > > given
> > > > > > > from the poll() call it is out of the buffer. If offsets are
> not
> > > > > > committed,
> > > > > > > then upon failover it will simply re-consumer these records
> again
> > > > from
> > > > > > > Kafka.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > > <hcai@pinterest.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > For the technique of custom Processor of holding call to
> > > > > > > context.forward(),
> > > > > > > > if I hold it for 10 minutes, what does that mean for the
> > consumer
> > > > > > > > acknowledgement on source node?
> > > > > > > >
> > > > > > > > I guess if I hold it for 10 minutes, the consumer is not
> going
> > to
> > > > ack
> > > > > > to
> > > > > > > > the upstream queue, will that impact the consumer
> performance,
> > > will
> > > > > > > > consumer's kafka client message buffer overflow when there is
> > no
> > > > ack
> > > > > in
> > > > > > > 10
> > > > > > > > minutes?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Yes we are aware of this behavior and are working on
> > optimizing
> > > > it:
> > > > > > > > >
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > > >
> > > > > > > > > More generally, we are considering to add a "trigger"
> > interface
> > > > > > similar
> > > > > > > > to
> > > > > > > > > the Millwheel model where users can customize when they
> want
> > to
> > > > > emit
> > > > > > > > > outputs to the downstream operators. Unfortunately for now
> > > there
> > > > > will
> > > > > > > no
> > > > > > > > > easy workaround for buffering, and you may want to do this
> in
> > > app
> > > > > > code
> > > > > > > > (for
> > > > > > > > > example, in a customized Processor where you can control
> when
> > > to
> > > > > call
> > > > > > > > > context.forward() ).
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > > jklukas@simple.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Is it true that the aggregation and reduction methods of
> > > > KStream
> > > > > > will
> > > > > > > > > emit
> > > > > > > > > > a new output message for each incoming message?
> > > > > > > > > >
> > > > > > > > > > I have an application that's copying a Postgres
> replication
> > > > > stream
> > > > > > > to a
> > > > > > > > > > Kafka topic, and activity tends to be clustered, with
> many
> > > > > updates
> > > > > > > to a
> > > > > > > > > > given primary key happening in quick succession. I'd like
> > to
> > > > > smooth
> > > > > > > > that
> > > > > > > > > > out by buffering the messages in tumbling windows,
> allowing
> > > the
> > > > > > > updates
> > > > > > > > > to
> > > > > > > > > > overwrite one another, and emitting output messages only
> at
> > > the
> > > > > end
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > window.
> > > > > > > > > >
> > > > > > > > > > Does the Kafka Streams API provide any hooks that I could
> > use
> > > > to
> > > > > > > > achieve
> > > > > > > > > > this kind of windowed "buffering" or "deduplication" of a
> > > > stream?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
I saw that note, thanks for commenting.

I are cutting the next 0.10.0.0 RC next week, so I am not certain if it
will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.

Guozhang

On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> Thanks.
>
> Do you know when KAFKA-3101 will be implemented?
>
> I also add a note to that JIRA for a left outer join use case which also
> need buffer support.
>
>
> On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Henry,
> >
> > I thought you were concerned about consumer memory contention. That's a
> > valid point, and yes, you need to keep those buffered records in a
> > persistent store.
> >
> > As I mentioned we are trying to do optimize the aggregation outputs as in
> >
> > https://issues.apache.org/jira/browse/KAFKA-3101
> >
> > Its idea is very similar to buffering, while we keep the aggregated
> values
> > in RocksDB, we do not send the updated values for each receiving record
> but
> > only do that based on some policy. More generally we can have a trigger
> > mechanism for user to customize when to emit.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai <hc...@pinterest.com.invalid>
> > wrote:
> >
> > > I think this scheme still has problems.  If during 'holding' I
> literally
> > > hold (don't return the method call), I will starve the thread.  If I am
> > > writing the output to a in-memory buffer and let the method returns,
> the
> > > kafka stream will acknowledge the record to upstream queue as
> processed,
> > so
> > > I would lose the record if the node crashed after ack but before 10
> > minutes
> > > is up.
> > >
> > > I guess I need to write the buffered result into a persistent store,
> > > another kafka queue or K/V store.
> > >
> > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > By "holding the stream", I assume you are still consuming data, but
> > just
> > > > that you only write data every 10 minutes instead of upon each
> received
> > > > record right?
> > > >
> > > > Anyways, in either case, consumer should not have severe memory issue
> > as
> > > > Kafka Streams will pause its consuming when enough data is buffered
> at
> > > the
> > > > streams end (note that we have two buffers here, the consumer buffers
> > raw
> > > > bytes, and the streams library take raw bytes and buffer the
> > > de-serialized
> > > > objects, and threshold on its own buffer to pause / resume the
> > consumer).
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> <hcai@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > So hold the stream for 15 minutes wouldn't cause too much
> performance
> > > > > problems?
> > > > >
> > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Consumer' buffer does not depend on offset committing, once it is
> > > given
> > > > > > from the poll() call it is out of the buffer. If offsets are not
> > > > > committed,
> > > > > > then upon failover it will simply re-consumer these records again
> > > from
> > > > > > Kafka.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > <hcai@pinterest.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > For the technique of custom Processor of holding call to
> > > > > > context.forward(),
> > > > > > > if I hold it for 10 minutes, what does that mean for the
> consumer
> > > > > > > acknowledgement on source node?
> > > > > > >
> > > > > > > I guess if I hold it for 10 minutes, the consumer is not going
> to
> > > ack
> > > > > to
> > > > > > > the upstream queue, will that impact the consumer performance,
> > will
> > > > > > > consumer's kafka client message buffer overflow when there is
> no
> > > ack
> > > > in
> > > > > > 10
> > > > > > > minutes?
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Yes we are aware of this behavior and are working on
> optimizing
> > > it:
> > > > > > > >
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > >
> > > > > > > > More generally, we are considering to add a "trigger"
> interface
> > > > > similar
> > > > > > > to
> > > > > > > > the Millwheel model where users can customize when they want
> to
> > > > emit
> > > > > > > > outputs to the downstream operators. Unfortunately for now
> > there
> > > > will
> > > > > > no
> > > > > > > > easy workaround for buffering, and you may want to do this in
> > app
> > > > > code
> > > > > > > (for
> > > > > > > > example, in a customized Processor where you can control when
> > to
> > > > call
> > > > > > > > context.forward() ).
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > jklukas@simple.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Is it true that the aggregation and reduction methods of
> > > KStream
> > > > > will
> > > > > > > > emit
> > > > > > > > > a new output message for each incoming message?
> > > > > > > > >
> > > > > > > > > I have an application that's copying a Postgres replication
> > > > stream
> > > > > > to a
> > > > > > > > > Kafka topic, and activity tends to be clustered, with many
> > > > updates
> > > > > > to a
> > > > > > > > > given primary key happening in quick succession. I'd like
> to
> > > > smooth
> > > > > > > that
> > > > > > > > > out by buffering the messages in tumbling windows, allowing
> > the
> > > > > > updates
> > > > > > > > to
> > > > > > > > > overwrite one another, and emitting output messages only at
> > the
> > > > end
> > > > > > of
> > > > > > > > the
> > > > > > > > > window.
> > > > > > > > >
> > > > > > > > > Does the Kafka Streams API provide any hooks that I could
> use
> > > to
> > > > > > > achieve
> > > > > > > > > this kind of windowed "buffering" or "deduplication" of a
> > > stream?
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
Thanks.

Do you know when KAFKA-3101 will be implemented?

I also add a note to that JIRA for a left outer join use case which also
need buffer support.


On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Henry,
>
> I thought you were concerned about consumer memory contention. That's a
> valid point, and yes, you need to keep those buffered records in a
> persistent store.
>
> As I mentioned we are trying to do optimize the aggregation outputs as in
>
> https://issues.apache.org/jira/browse/KAFKA-3101
>
> Its idea is very similar to buffering, while we keep the aggregated values
> in RocksDB, we do not send the updated values for each receiving record but
> only do that based on some policy. More generally we can have a trigger
> mechanism for user to customize when to emit.
>
>
> Guozhang
>
>
> On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai <hc...@pinterest.com.invalid>
> wrote:
>
> > I think this scheme still has problems.  If during 'holding' I literally
> > hold (don't return the method call), I will starve the thread.  If I am
> > writing the output to a in-memory buffer and let the method returns, the
> > kafka stream will acknowledge the record to upstream queue as processed,
> so
> > I would lose the record if the node crashed after ack but before 10
> minutes
> > is up.
> >
> > I guess I need to write the buffered result into a persistent store,
> > another kafka queue or K/V store.
> >
> > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > By "holding the stream", I assume you are still consuming data, but
> just
> > > that you only write data every 10 minutes instead of upon each received
> > > record right?
> > >
> > > Anyways, in either case, consumer should not have severe memory issue
> as
> > > Kafka Streams will pause its consuming when enough data is buffered at
> > the
> > > streams end (note that we have two buffers here, the consumer buffers
> raw
> > > bytes, and the streams library take raw bytes and buffer the
> > de-serialized
> > > objects, and threshold on its own buffer to pause / resume the
> consumer).
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai <hcai@pinterest.com.invalid
> >
> > > wrote:
> > >
> > > > So hold the stream for 15 minutes wouldn't cause too much performance
> > > > problems?
> > > >
> > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Consumer' buffer does not depend on offset committing, once it is
> > given
> > > > > from the poll() call it is out of the buffer. If offsets are not
> > > > committed,
> > > > > then upon failover it will simply re-consumer these records again
> > from
> > > > > Kafka.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > <hcai@pinterest.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > For the technique of custom Processor of holding call to
> > > > > context.forward(),
> > > > > > if I hold it for 10 minutes, what does that mean for the consumer
> > > > > > acknowledgement on source node?
> > > > > >
> > > > > > I guess if I hold it for 10 minutes, the consumer is not going to
> > ack
> > > > to
> > > > > > the upstream queue, will that impact the consumer performance,
> will
> > > > > > consumer's kafka client message buffer overflow when there is no
> > ack
> > > in
> > > > > 10
> > > > > > minutes?
> > > > > >
> > > > > >
> > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Yes we are aware of this behavior and are working on optimizing
> > it:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > >
> > > > > > > More generally, we are considering to add a "trigger" interface
> > > > similar
> > > > > > to
> > > > > > > the Millwheel model where users can customize when they want to
> > > emit
> > > > > > > outputs to the downstream operators. Unfortunately for now
> there
> > > will
> > > > > no
> > > > > > > easy workaround for buffering, and you may want to do this in
> app
> > > > code
> > > > > > (for
> > > > > > > example, in a customized Processor where you can control when
> to
> > > call
> > > > > > > context.forward() ).
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> jklukas@simple.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Is it true that the aggregation and reduction methods of
> > KStream
> > > > will
> > > > > > > emit
> > > > > > > > a new output message for each incoming message?
> > > > > > > >
> > > > > > > > I have an application that's copying a Postgres replication
> > > stream
> > > > > to a
> > > > > > > > Kafka topic, and activity tends to be clustered, with many
> > > updates
> > > > > to a
> > > > > > > > given primary key happening in quick succession. I'd like to
> > > smooth
> > > > > > that
> > > > > > > > out by buffering the messages in tumbling windows, allowing
> the
> > > > > updates
> > > > > > > to
> > > > > > > > overwrite one another, and emitting output messages only at
> the
> > > end
> > > > > of
> > > > > > > the
> > > > > > > > window.
> > > > > > > >
> > > > > > > > Does the Kafka Streams API provide any hooks that I could use
> > to
> > > > > > achieve
> > > > > > > > this kind of windowed "buffering" or "deduplication" of a
> > stream?
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
Henry,

I thought you were concerned about consumer memory contention. That's a
valid point, and yes, you need to keep those buffered records in a
persistent store.

As I mentioned we are trying to do optimize the aggregation outputs as in

https://issues.apache.org/jira/browse/KAFKA-3101

Its idea is very similar to buffering, while we keep the aggregated values
in RocksDB, we do not send the updated values for each receiving record but
only do that based on some policy. More generally we can have a trigger
mechanism for user to customize when to emit.


Guozhang


On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> I think this scheme still has problems.  If during 'holding' I literally
> hold (don't return the method call), I will starve the thread.  If I am
> writing the output to a in-memory buffer and let the method returns, the
> kafka stream will acknowledge the record to upstream queue as processed, so
> I would lose the record if the node crashed after ack but before 10 minutes
> is up.
>
> I guess I need to write the buffered result into a persistent store,
> another kafka queue or K/V store.
>
> On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > By "holding the stream", I assume you are still consuming data, but just
> > that you only write data every 10 minutes instead of upon each received
> > record right?
> >
> > Anyways, in either case, consumer should not have severe memory issue as
> > Kafka Streams will pause its consuming when enough data is buffered at
> the
> > streams end (note that we have two buffers here, the consumer buffers raw
> > bytes, and the streams library take raw bytes and buffer the
> de-serialized
> > objects, and threshold on its own buffer to pause / resume the consumer).
> >
> >
> > Guozhang
> >
> > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai <hc...@pinterest.com.invalid>
> > wrote:
> >
> > > So hold the stream for 15 minutes wouldn't cause too much performance
> > > problems?
> > >
> > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Consumer' buffer does not depend on offset committing, once it is
> given
> > > > from the poll() call it is out of the buffer. If offsets are not
> > > committed,
> > > > then upon failover it will simply re-consumer these records again
> from
> > > > Kafka.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> <hcai@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > For the technique of custom Processor of holding call to
> > > > context.forward(),
> > > > > if I hold it for 10 minutes, what does that mean for the consumer
> > > > > acknowledgement on source node?
> > > > >
> > > > > I guess if I hold it for 10 minutes, the consumer is not going to
> ack
> > > to
> > > > > the upstream queue, will that impact the consumer performance, will
> > > > > consumer's kafka client message buffer overflow when there is no
> ack
> > in
> > > > 10
> > > > > minutes?
> > > > >
> > > > >
> > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Yes we are aware of this behavior and are working on optimizing
> it:
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > >
> > > > > > More generally, we are considering to add a "trigger" interface
> > > similar
> > > > > to
> > > > > > the Millwheel model where users can customize when they want to
> > emit
> > > > > > outputs to the downstream operators. Unfortunately for now there
> > will
> > > > no
> > > > > > easy workaround for buffering, and you may want to do this in app
> > > code
> > > > > (for
> > > > > > example, in a customized Processor where you can control when to
> > call
> > > > > > context.forward() ).
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jklukas@simple.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Is it true that the aggregation and reduction methods of
> KStream
> > > will
> > > > > > emit
> > > > > > > a new output message for each incoming message?
> > > > > > >
> > > > > > > I have an application that's copying a Postgres replication
> > stream
> > > > to a
> > > > > > > Kafka topic, and activity tends to be clustered, with many
> > updates
> > > > to a
> > > > > > > given primary key happening in quick succession. I'd like to
> > smooth
> > > > > that
> > > > > > > out by buffering the messages in tumbling windows, allowing the
> > > > updates
> > > > > > to
> > > > > > > overwrite one another, and emitting output messages only at the
> > end
> > > > of
> > > > > > the
> > > > > > > window.
> > > > > > >
> > > > > > > Does the Kafka Streams API provide any hooks that I could use
> to
> > > > > achieve
> > > > > > > this kind of windowed "buffering" or "deduplication" of a
> stream?
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
I think this scheme still has problems.  If during 'holding' I literally
hold (don't return the method call), I will starve the thread.  If I am
writing the output to a in-memory buffer and let the method returns, the
kafka stream will acknowledge the record to upstream queue as processed, so
I would lose the record if the node crashed after ack but before 10 minutes
is up.

I guess I need to write the buffered result into a persistent store,
another kafka queue or K/V store.

On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wa...@gmail.com> wrote:

> By "holding the stream", I assume you are still consuming data, but just
> that you only write data every 10 minutes instead of upon each received
> record right?
>
> Anyways, in either case, consumer should not have severe memory issue as
> Kafka Streams will pause its consuming when enough data is buffered at the
> streams end (note that we have two buffers here, the consumer buffers raw
> bytes, and the streams library take raw bytes and buffer the de-serialized
> objects, and threshold on its own buffer to pause / resume the consumer).
>
>
> Guozhang
>
> On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai <hc...@pinterest.com.invalid>
> wrote:
>
> > So hold the stream for 15 minutes wouldn't cause too much performance
> > problems?
> >
> > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Consumer' buffer does not depend on offset committing, once it is given
> > > from the poll() call it is out of the buffer. If offsets are not
> > committed,
> > > then upon failover it will simply re-consumer these records again from
> > > Kafka.
> > >
> > > Guozhang
> > >
> > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai <hcai@pinterest.com.invalid
> >
> > > wrote:
> > >
> > > > For the technique of custom Processor of holding call to
> > > context.forward(),
> > > > if I hold it for 10 minutes, what does that mean for the consumer
> > > > acknowledgement on source node?
> > > >
> > > > I guess if I hold it for 10 minutes, the consumer is not going to ack
> > to
> > > > the upstream queue, will that impact the consumer performance, will
> > > > consumer's kafka client message buffer overflow when there is no ack
> in
> > > 10
> > > > minutes?
> > > >
> > > >
> > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Yes we are aware of this behavior and are working on optimizing it:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > >
> > > > > More generally, we are considering to add a "trigger" interface
> > similar
> > > > to
> > > > > the Millwheel model where users can customize when they want to
> emit
> > > > > outputs to the downstream operators. Unfortunately for now there
> will
> > > no
> > > > > easy workaround for buffering, and you may want to do this in app
> > code
> > > > (for
> > > > > example, in a customized Processor where you can control when to
> call
> > > > > context.forward() ).
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com>
> > > wrote:
> > > > >
> > > > > > Is it true that the aggregation and reduction methods of KStream
> > will
> > > > > emit
> > > > > > a new output message for each incoming message?
> > > > > >
> > > > > > I have an application that's copying a Postgres replication
> stream
> > > to a
> > > > > > Kafka topic, and activity tends to be clustered, with many
> updates
> > > to a
> > > > > > given primary key happening in quick succession. I'd like to
> smooth
> > > > that
> > > > > > out by buffering the messages in tumbling windows, allowing the
> > > updates
> > > > > to
> > > > > > overwrite one another, and emitting output messages only at the
> end
> > > of
> > > > > the
> > > > > > window.
> > > > > >
> > > > > > Does the Kafka Streams API provide any hooks that I could use to
> > > > achieve
> > > > > > this kind of windowed "buffering" or "deduplication" of a stream?
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
By "holding the stream", I assume you are still consuming data, but just
that you only write data every 10 minutes instead of upon each received
record right?

Anyways, in either case, consumer should not have severe memory issue as
Kafka Streams will pause its consuming when enough data is buffered at the
streams end (note that we have two buffers here, the consumer buffers raw
bytes, and the streams library take raw bytes and buffer the de-serialized
objects, and threshold on its own buffer to pause / resume the consumer).


Guozhang

On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> So hold the stream for 15 minutes wouldn't cause too much performance
> problems?
>
> On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Consumer' buffer does not depend on offset committing, once it is given
> > from the poll() call it is out of the buffer. If offsets are not
> committed,
> > then upon failover it will simply re-consumer these records again from
> > Kafka.
> >
> > Guozhang
> >
> > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai <hc...@pinterest.com.invalid>
> > wrote:
> >
> > > For the technique of custom Processor of holding call to
> > context.forward(),
> > > if I hold it for 10 minutes, what does that mean for the consumer
> > > acknowledgement on source node?
> > >
> > > I guess if I hold it for 10 minutes, the consumer is not going to ack
> to
> > > the upstream queue, will that impact the consumer performance, will
> > > consumer's kafka client message buffer overflow when there is no ack in
> > 10
> > > minutes?
> > >
> > >
> > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Yes we are aware of this behavior and are working on optimizing it:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > >
> > > > More generally, we are considering to add a "trigger" interface
> similar
> > > to
> > > > the Millwheel model where users can customize when they want to emit
> > > > outputs to the downstream operators. Unfortunately for now there will
> > no
> > > > easy workaround for buffering, and you may want to do this in app
> code
> > > (for
> > > > example, in a customized Processor where you can control when to call
> > > > context.forward() ).
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com>
> > wrote:
> > > >
> > > > > Is it true that the aggregation and reduction methods of KStream
> will
> > > > emit
> > > > > a new output message for each incoming message?
> > > > >
> > > > > I have an application that's copying a Postgres replication stream
> > to a
> > > > > Kafka topic, and activity tends to be clustered, with many updates
> > to a
> > > > > given primary key happening in quick succession. I'd like to smooth
> > > that
> > > > > out by buffering the messages in tumbling windows, allowing the
> > updates
> > > > to
> > > > > overwrite one another, and emitting output messages only at the end
> > of
> > > > the
> > > > > window.
> > > > >
> > > > > Does the Kafka Streams API provide any hooks that I could use to
> > > achieve
> > > > > this kind of windowed "buffering" or "deduplication" of a stream?
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
So hold the stream for 15 minutes wouldn't cause too much performance
problems?

On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Consumer' buffer does not depend on offset committing, once it is given
> from the poll() call it is out of the buffer. If offsets are not committed,
> then upon failover it will simply re-consumer these records again from
> Kafka.
>
> Guozhang
>
> On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai <hc...@pinterest.com.invalid>
> wrote:
>
> > For the technique of custom Processor of holding call to
> context.forward(),
> > if I hold it for 10 minutes, what does that mean for the consumer
> > acknowledgement on source node?
> >
> > I guess if I hold it for 10 minutes, the consumer is not going to ack to
> > the upstream queue, will that impact the consumer performance, will
> > consumer's kafka client message buffer overflow when there is no ack in
> 10
> > minutes?
> >
> >
> > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Yes we are aware of this behavior and are working on optimizing it:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-3101
> > >
> > > More generally, we are considering to add a "trigger" interface similar
> > to
> > > the Millwheel model where users can customize when they want to emit
> > > outputs to the downstream operators. Unfortunately for now there will
> no
> > > easy workaround for buffering, and you may want to do this in app code
> > (for
> > > example, in a customized Processor where you can control when to call
> > > context.forward() ).
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com>
> wrote:
> > >
> > > > Is it true that the aggregation and reduction methods of KStream will
> > > emit
> > > > a new output message for each incoming message?
> > > >
> > > > I have an application that's copying a Postgres replication stream
> to a
> > > > Kafka topic, and activity tends to be clustered, with many updates
> to a
> > > > given primary key happening in quick succession. I'd like to smooth
> > that
> > > > out by buffering the messages in tumbling windows, allowing the
> updates
> > > to
> > > > overwrite one another, and emitting output messages only at the end
> of
> > > the
> > > > window.
> > > >
> > > > Does the Kafka Streams API provide any hooks that I could use to
> > achieve
> > > > this kind of windowed "buffering" or "deduplication" of a stream?
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
Consumer' buffer does not depend on offset committing, once it is given
from the poll() call it is out of the buffer. If offsets are not committed,
then upon failover it will simply re-consumer these records again from
Kafka.

Guozhang

On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai <hc...@pinterest.com.invalid>
wrote:

> For the technique of custom Processor of holding call to context.forward(),
> if I hold it for 10 minutes, what does that mean for the consumer
> acknowledgement on source node?
>
> I guess if I hold it for 10 minutes, the consumer is not going to ack to
> the upstream queue, will that impact the consumer performance, will
> consumer's kafka client message buffer overflow when there is no ack in 10
> minutes?
>
>
> On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Yes we are aware of this behavior and are working on optimizing it:
> >
> > https://issues.apache.org/jira/browse/KAFKA-3101
> >
> > More generally, we are considering to add a "trigger" interface similar
> to
> > the Millwheel model where users can customize when they want to emit
> > outputs to the downstream operators. Unfortunately for now there will no
> > easy workaround for buffering, and you may want to do this in app code
> (for
> > example, in a customized Processor where you can control when to call
> > context.forward() ).
> >
> > Guozhang
> >
> >
> > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com> wrote:
> >
> > > Is it true that the aggregation and reduction methods of KStream will
> > emit
> > > a new output message for each incoming message?
> > >
> > > I have an application that's copying a Postgres replication stream to a
> > > Kafka topic, and activity tends to be clustered, with many updates to a
> > > given primary key happening in quick succession. I'd like to smooth
> that
> > > out by buffering the messages in tumbling windows, allowing the updates
> > to
> > > overwrite one another, and emitting output messages only at the end of
> > the
> > > window.
> > >
> > > Does the Kafka Streams API provide any hooks that I could use to
> achieve
> > > this kind of windowed "buffering" or "deduplication" of a stream?
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Henry Cai <hc...@pinterest.com.INVALID>.
For the technique of custom Processor of holding call to context.forward(),
if I hold it for 10 minutes, what does that mean for the consumer
acknowledgement on source node?

I guess if I hold it for 10 minutes, the consumer is not going to ack to
the upstream queue, will that impact the consumer performance, will
consumer's kafka client message buffer overflow when there is no ack in 10
minutes?


On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Yes we are aware of this behavior and are working on optimizing it:
>
> https://issues.apache.org/jira/browse/KAFKA-3101
>
> More generally, we are considering to add a "trigger" interface similar to
> the Millwheel model where users can customize when they want to emit
> outputs to the downstream operators. Unfortunately for now there will no
> easy workaround for buffering, and you may want to do this in app code (for
> example, in a customized Processor where you can control when to call
> context.forward() ).
>
> Guozhang
>
>
> On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com> wrote:
>
> > Is it true that the aggregation and reduction methods of KStream will
> emit
> > a new output message for each incoming message?
> >
> > I have an application that's copying a Postgres replication stream to a
> > Kafka topic, and activity tends to be clustered, with many updates to a
> > given primary key happening in quick succession. I'd like to smooth that
> > out by buffering the messages in tumbling windows, allowing the updates
> to
> > overwrite one another, and emitting output messages only at the end of
> the
> > window.
> >
> > Does the Kafka Streams API provide any hooks that I could use to achieve
> > this kind of windowed "buffering" or "deduplication" of a stream?
> >
>
>
>
> --
> -- Guozhang
>

Re: How to "buffer" a stream with high churn and output only at the end of a window?

Posted by Guozhang Wang <wa...@gmail.com>.
Yes we are aware of this behavior and are working on optimizing it:

https://issues.apache.org/jira/browse/KAFKA-3101

More generally, we are considering to add a "trigger" interface similar to
the Millwheel model where users can customize when they want to emit
outputs to the downstream operators. Unfortunately for now there will no
easy workaround for buffering, and you may want to do this in app code (for
example, in a customized Processor where you can control when to call
context.forward() ).

Guozhang


On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jk...@simple.com> wrote:

> Is it true that the aggregation and reduction methods of KStream will emit
> a new output message for each incoming message?
>
> I have an application that's copying a Postgres replication stream to a
> Kafka topic, and activity tends to be clustered, with many updates to a
> given primary key happening in quick succession. I'd like to smooth that
> out by buffering the messages in tumbling windows, allowing the updates to
> overwrite one another, and emitting output messages only at the end of the
> window.
>
> Does the Kafka Streams API provide any hooks that I could use to achieve
> this kind of windowed "buffering" or "deduplication" of a stream?
>



-- 
-- Guozhang