You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Davood Rafiei <ra...@gmail.com> on 2016/06/16 14:20:54 UTC
Groupby Operator
Hi,
I am trying to use groupby operator in simple example. However, I get
strange results.
I have inputs on "test" topic like: (Long, String)
1 Message_1
1 Message_1
2 Message_2
3 Message_3
4 Message_4
I want to get counts of each value. So:
Message_1 2
Message_1 1
Message_2 1
Message_3 1
Message_4 1
Because there is not any operator like groupby (fieldIndex), I assume that
groupby works always on keys.
So, my program is:
KTable<Long, String> source = builder.table(longSerde, stringSerde,
"test");
KTable<String,Long> counts = source.groupBy(new KeyValueMapper<Long,
String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(Long key, String value) {
// TODO Auto-generated method stub
return KeyValue.pair(value, value);
}
},Serdes.String(), Serdes.String()).count("count");
counts.print();;
And I get this output as a result:
Message_1 1
Message_1 0
Message_1 1
Message_1 0
Message_2 1
Message_2 0
Message_3 1
Message_3 0
Message_4 1
Message_4 0
I couldn't understand this behavior.
Cheers
Davood
Re: Groupby Operator
Posted by Davood Rafiei <ra...@gmail.com>.
Thank you for your thorough explanation Michael. It helped a lot.
Cheers
Davood
On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mi...@confluent.io> wrote:
> Davood,
>
> you are reading the input topic into a KTable, which means that subsequent
> records for the same key (such as the key `1`, which appears twice in the
> input messages/records) will be considered as updates to any previous
> records for that key. So I think what you actually want to do is read the
> input as a KStream instead of a KTable?
>
> The following code works for me, it looks like what you're trying to do.
> Note that I am reading the input data into a KStream, not a KTable.
>
> Input:
> new KeyValue<>(1, "message1"),
> new KeyValue<>(1, "message1"),
> new KeyValue<>(2, "message2"),
> new KeyValue<>(3, "message3"),
> new KeyValue<>(4, "message4")
>
> Streams topology:
>
> KStream<Integer, String> input = builder.stream(Serdes.Integer(),
> Serdes.String(), inputTopic);
> KTable<String, Long> counted = input
> .map((key, value) -> KeyValue.pair(value, value))
> .countByKey(Serdes.String(), "counted");
> counted.to(Serdes.String(), Serdes.Long(), outputTopic);
>
> Output:
> new KeyValue<>("message1", 1L),
> new KeyValue<>("message1", 2L),
> new KeyValue<>("message2", 1L),
> new KeyValue<>("message3", 1L),
> new KeyValue<>("message4", 1L)
>
> Does that help?
> Michael
>
>
>
>
> On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <ra...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > I am trying to use groupby operator in simple example. However, I get
> > strange results.
> >
> > I have inputs on "test" topic like: (Long, String)
> > 1 Message_1
> > 1 Message_1
> > 2 Message_2
> > 3 Message_3
> > 4 Message_4
> >
> > I want to get counts of each value. So:
> > Message_1 2
> > Message_1 1
> > Message_2 1
> > Message_3 1
> > Message_4 1
> >
> > Because there is not any operator like groupby (fieldIndex), I assume
> that
> > groupby works always on keys.
> >
> > So, my program is:
> >
> > KTable<Long, String> source = builder.table(longSerde, stringSerde,
> > "test");
> > KTable<String,Long> counts = source.groupBy(new
> KeyValueMapper<Long,
> > String, KeyValue<String, String>>() {
> >
> > @Override
> > public KeyValue<String, String> apply(Long key, String value) {
> > // TODO Auto-generated method stub
> > return KeyValue.pair(value, value);
> > }
> > },Serdes.String(), Serdes.String()).count("count");
> > counts.print();;
> >
> > And I get this output as a result:
> >
> > Message_1 1
> > Message_1 0
> > Message_1 1
> > Message_1 0
> > Message_2 1
> > Message_2 0
> > Message_3 1
> > Message_3 0
> > Message_4 1
> > Message_4 0
> >
> > I couldn't understand this behavior.
> >
> >
> > Cheers
> > Davood
> >
>
Re: Groupby Operator
Posted by Michael Noll <mi...@confluent.io>.
Davood,
you are reading the input topic into a KTable, which means that subsequent
records for the same key (such as the key `1`, which appears twice in the
input messages/records) will be considered as updates to any previous
records for that key. So I think what you actually want to do is read the
input as a KStream instead of a KTable?
The following code works for me, it looks like what you're trying to do.
Note that I am reading the input data into a KStream, not a KTable.
Input:
new KeyValue<>(1, "message1"),
new KeyValue<>(1, "message1"),
new KeyValue<>(2, "message2"),
new KeyValue<>(3, "message3"),
new KeyValue<>(4, "message4")
Streams topology:
KStream<Integer, String> input = builder.stream(Serdes.Integer(),
Serdes.String(), inputTopic);
KTable<String, Long> counted = input
.map((key, value) -> KeyValue.pair(value, value))
.countByKey(Serdes.String(), "counted");
counted.to(Serdes.String(), Serdes.Long(), outputTopic);
Output:
new KeyValue<>("message1", 1L),
new KeyValue<>("message1", 2L),
new KeyValue<>("message2", 1L),
new KeyValue<>("message3", 1L),
new KeyValue<>("message4", 1L)
Does that help?
Michael
On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <ra...@gmail.com>
wrote:
> Hi,
>
>
> I am trying to use groupby operator in simple example. However, I get
> strange results.
>
> I have inputs on "test" topic like: (Long, String)
> 1 Message_1
> 1 Message_1
> 2 Message_2
> 3 Message_3
> 4 Message_4
>
> I want to get counts of each value. So:
> Message_1 2
> Message_1 1
> Message_2 1
> Message_3 1
> Message_4 1
>
> Because there is not any operator like groupby (fieldIndex), I assume that
> groupby works always on keys.
>
> So, my program is:
>
> KTable<Long, String> source = builder.table(longSerde, stringSerde,
> "test");
> KTable<String,Long> counts = source.groupBy(new KeyValueMapper<Long,
> String, KeyValue<String, String>>() {
>
> @Override
> public KeyValue<String, String> apply(Long key, String value) {
> // TODO Auto-generated method stub
> return KeyValue.pair(value, value);
> }
> },Serdes.String(), Serdes.String()).count("count");
> counts.print();;
>
> And I get this output as a result:
>
> Message_1 1
> Message_1 0
> Message_1 1
> Message_1 0
> Message_2 1
> Message_2 0
> Message_3 1
> Message_3 0
> Message_4 1
> Message_4 0
>
> I couldn't understand this behavior.
>
>
> Cheers
> Davood
>