You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Damian Guy <da...@gmail.com> on 2016/04/17 18:59:42 UTC

KTable.count(...)

Hi,

I'm slightly confused by KTable.count(..). The javadoc says:

 Count number of records of this stream by the selected key into a new
instance of {@link KTable}.

So.. if i send 5 records with the same key to the input topic, as per below

final KafkaProducer<String, Integer> producer = new
KafkaProducer<>(producerProperties, new StringSerializer(), new
IntegerSerializer());
for(int i =0;i<5;i++) {
    producer.send(new ProducerRecord<>("input", "A",i));
}
producer.flush();

and then setup my stream like so:

final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, Integer> table = builder.table(Serdes.String(),
Serdes.Integer(), "input");
final KTable<String, Long> count = table.count(new
KeyValueMapper<String, Integer, String>() {
    @Override
    public String apply(final String key, final Integer value) {
        return key;
    }
}, Serdes.String(), Serdes.Integer(),"count");

count.to(Serdes.String(), Serdes.Long(),"count");

And then consume the data from the "count" topic I thought i should
eventually get a record where the key is A and the value is 5, i.e, the
number of times the key A was seen in the input stream. However, this is
not the case. What i receive on the count topic is:
A:1
A:2
A:1
A:2
A:1
A:2
A:1
A:2
A:1

Is this expected behaviour? Have i misunderstood how count is supposed to
work?

Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
NullPointerException

Thanks,
Damian

Re: KTable.count(...)

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Damain,

The semantics of Count() is still depending on the KTable, where the
records are of changelogs based on key. So for your example the KTable is
really conceptually from an updating "source table" with only one record
but with updating values from 1 to 5. And KTable.count(...) is essentially
"count the records per each key in the source table", which will always
return:

A: 1

And you get "A: 2" because the results are actually in the format of
Change<>, which is a pair of old / new pairs, and hence you actually get:

New: A=>1, Old: null
New: A=>2, Old: A=>1 (after applying the addition)
New: A=>1, Old: A=>2 (after applying the subtraction)
New: A=>2, Old: A=>1 (after applying the addition)
New: A=>1, Old: A=>2 (after applying the subtraction)
....
New: A=>1, Old: A=>2 (after applying the subtraction)

And the final result should always be A=>1.


Generally speaking, any aggregations that are on the primary key would have
meaningless semantics as there will only be one record for each key.


"KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
NullPointerException": this seems like a bug, would you like to file a JIRA
to keep track of investigation?


Guozhang


On Mon, Apr 18, 2016 at 3:12 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Liquan,
>
> Thanks for getting back to me and pointing me to the confluent doco. Based
> on what i read and my own assumptions, i'd expect the data consumed from
> the output topic to be:
>
> A:1
> A:2
> A:3
> A:4
> A:5
>
> What am i missing?
>
> Thanks,
> Damian
>
> On Sun, 17 Apr 2016 at 19:20 Liquan Pei <li...@gmail.com> wrote:
>
> > Hi Damin,
> >
> > I am new to KStreams as well, so my answer might not be 100% precise. In
> > KTable, the same key is treated as updates instead of events. Thus
> > aggregation on the same key will do some de-dup. The docs for the tech
> > preview contains some explanation on this behavior:
> >
> >
> >
> http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream
> >
> >
> http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data
> >
> > Maybe we can update the Javadoc to make this behavior more explicit?
> >
> > Thanks,
> > Liquan
> >
> > On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > I'm slightly confused by KTable.count(..). The javadoc says:
> > >
> > >  Count number of records of this stream by the selected key into a new
> > > instance of {@link KTable}.
> > >
> > > So.. if i send 5 records with the same key to the input topic, as per
> > below
> > >
> > > final KafkaProducer<String, Integer> producer = new
> > > KafkaProducer<>(producerProperties, new StringSerializer(), new
> > > IntegerSerializer());
> > > for(int i =0;i<5;i++) {
> > >     producer.send(new ProducerRecord<>("input", "A",i));
> > > }
> > > producer.flush();
> > >
> > > and then setup my stream like so:
> > >
> > > final KStreamBuilder builder = new KStreamBuilder();
> > > final KTable<String, Integer> table = builder.table(Serdes.String(),
> > > Serdes.Integer(), "input");
> > > final KTable<String, Long> count = table.count(new
> > > KeyValueMapper<String, Integer, String>() {
> > >     @Override
> > >     public String apply(final String key, final Integer value) {
> > >         return key;
> > >     }
> > > }, Serdes.String(), Serdes.Integer(),"count");
> > >
> > > count.to(Serdes.String(), Serdes.Long(),"count");
> > >
> > > And then consume the data from the "count" topic I thought i should
> > > eventually get a record where the key is A and the value is 5, i.e, the
> > > number of times the key A was seen in the input stream. However, this
> is
> > > not the case. What i receive on the count topic is:
> > > A:1
> > > A:2
> > > A:1
> > > A:2
> > > A:1
> > > A:2
> > > A:1
> > > A:2
> > > A:1
> > >
> > > Is this expected behaviour? Have i misunderstood how count is supposed
> to
> > > work?
> > >
> > > Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name)
> causes a
> > > NullPointerException
> > >
> > > Thanks,
> > > Damian
> > >
> >
> >
> >
> > --
> > Liquan Pei
> > Software Engineer, Confluent Inc
> >
>



-- 
-- Guozhang

Re: KTable.count(...)

Posted by Damian Guy <da...@gmail.com>.
Hi Liquan,

Thanks for getting back to me and pointing me to the confluent doco. Based
on what i read and my own assumptions, i'd expect the data consumed from
the output topic to be:

A:1
A:2
A:3
A:4
A:5

What am i missing?

Thanks,
Damian

On Sun, 17 Apr 2016 at 19:20 Liquan Pei <li...@gmail.com> wrote:

> Hi Damin,
>
> I am new to KStreams as well, so my answer might not be 100% precise. In
> KTable, the same key is treated as updates instead of events. Thus
> aggregation on the same key will do some de-dup. The docs for the tech
> preview contains some explanation on this behavior:
>
>
> http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream
>
> http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data
>
> Maybe we can update the Javadoc to make this behavior more explicit?
>
> Thanks,
> Liquan
>
> On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm slightly confused by KTable.count(..). The javadoc says:
> >
> >  Count number of records of this stream by the selected key into a new
> > instance of {@link KTable}.
> >
> > So.. if i send 5 records with the same key to the input topic, as per
> below
> >
> > final KafkaProducer<String, Integer> producer = new
> > KafkaProducer<>(producerProperties, new StringSerializer(), new
> > IntegerSerializer());
> > for(int i =0;i<5;i++) {
> >     producer.send(new ProducerRecord<>("input", "A",i));
> > }
> > producer.flush();
> >
> > and then setup my stream like so:
> >
> > final KStreamBuilder builder = new KStreamBuilder();
> > final KTable<String, Integer> table = builder.table(Serdes.String(),
> > Serdes.Integer(), "input");
> > final KTable<String, Long> count = table.count(new
> > KeyValueMapper<String, Integer, String>() {
> >     @Override
> >     public String apply(final String key, final Integer value) {
> >         return key;
> >     }
> > }, Serdes.String(), Serdes.Integer(),"count");
> >
> > count.to(Serdes.String(), Serdes.Long(),"count");
> >
> > And then consume the data from the "count" topic I thought i should
> > eventually get a record where the key is A and the value is 5, i.e, the
> > number of times the key A was seen in the input stream. However, this is
> > not the case. What i receive on the count topic is:
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> >
> > Is this expected behaviour? Have i misunderstood how count is supposed to
> > work?
> >
> > Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
> > NullPointerException
> >
> > Thanks,
> > Damian
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>

Re: KTable.count(...)

Posted by Liquan Pei <li...@gmail.com>.
Hi Damin,

I am new to KStreams as well, so my answer might not be 100% precise. In
KTable, the same key is treated as updates instead of events. Thus
aggregation on the same key will do some de-dup. The docs for the tech
preview contains some explanation on this behavior:

http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream
http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data

Maybe we can update the Javadoc to make this behavior more explicit?

Thanks,
Liquan

On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> I'm slightly confused by KTable.count(..). The javadoc says:
>
>  Count number of records of this stream by the selected key into a new
> instance of {@link KTable}.
>
> So.. if i send 5 records with the same key to the input topic, as per below
>
> final KafkaProducer<String, Integer> producer = new
> KafkaProducer<>(producerProperties, new StringSerializer(), new
> IntegerSerializer());
> for(int i =0;i<5;i++) {
>     producer.send(new ProducerRecord<>("input", "A",i));
> }
> producer.flush();
>
> and then setup my stream like so:
>
> final KStreamBuilder builder = new KStreamBuilder();
> final KTable<String, Integer> table = builder.table(Serdes.String(),
> Serdes.Integer(), "input");
> final KTable<String, Long> count = table.count(new
> KeyValueMapper<String, Integer, String>() {
>     @Override
>     public String apply(final String key, final Integer value) {
>         return key;
>     }
> }, Serdes.String(), Serdes.Integer(),"count");
>
> count.to(Serdes.String(), Serdes.Long(),"count");
>
> And then consume the data from the "count" topic I thought i should
> eventually get a record where the key is A and the value is 5, i.e, the
> number of times the key A was seen in the input stream. However, this is
> not the case. What i receive on the count topic is:
> A:1
> A:2
> A:1
> A:2
> A:1
> A:2
> A:1
> A:2
> A:1
>
> Is this expected behaviour? Have i misunderstood how count is supposed to
> work?
>
> Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
> NullPointerException
>
> Thanks,
> Damian
>



-- 
Liquan Pei
Software Engineer, Confluent Inc