You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Davood Rafiei <ra...@gmail.com> on 2016/06/16 14:20:54 UTC

Groupby Operator

Hi,


I am trying to use groupby operator in simple example. However, I get
strange results.

I have inputs  on "test" topic like: (Long, String)
1    Message_1
1    Message_1
2    Message_2
3    Message_3
4    Message_4

I want to get counts of each value. So:
Message_1 2
Message_1 1
Message_2 1
Message_3 1
Message_4 1

Because there is not any operator like groupby (fieldIndex), I assume that
groupby works always on keys.

So, my program is:

      KTable<Long, String> source = builder.table(longSerde, stringSerde,
"test");
      KTable<String,Long> counts =  source.groupBy(new KeyValueMapper<Long,
String, KeyValue<String, String>>() {

        @Override
        public KeyValue<String, String> apply(Long key, String value) {
            // TODO Auto-generated method stub
             return  KeyValue.pair(value, value);
        }
    },Serdes.String(), Serdes.String()).count("count");
      counts.print();;

And I get this output as a result:

Message_1    1
Message_1    0
Message_1    1
Message_1    0
Message_2    1
Message_2    0
Message_3    1
Message_3    0
Message_4    1
Message_4    0

I couldn't  understand this behavior.


Cheers
Davood

Re: Groupby Operator

Posted by Davood Rafiei <ra...@gmail.com>.
Thank you for your thorough explanation Michael. It helped a lot.

Cheers
Davood

On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mi...@confluent.io> wrote:

> Davood,
>
> you are reading the input topic into a KTable, which means that subsequent
> records for the same key (such as the key `1`, which appears twice in the
> input messages/records) will be considered as updates to any previous
> records for that key.  So I think what you actually want to do is read the
> input as a KStream instead of a KTable?
>
> The following code works for me, it looks like what you're trying to do.
> Note that I am reading the input data into a KStream, not a KTable.
>
> Input:
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(2, "message2"),
>   new KeyValue<>(3, "message3"),
>   new KeyValue<>(4, "message4")
>
> Streams topology:
>
>   KStream<Integer, String> input = builder.stream(Serdes.Integer(),
> Serdes.String(), inputTopic);
>   KTable<String, Long> counted = input
>       .map((key, value) -> KeyValue.pair(value, value))
>       .countByKey(Serdes.String(), "counted");
>   counted.to(Serdes.String(), Serdes.Long(), outputTopic);
>
> Output:
>   new KeyValue<>("message1", 1L),
>   new KeyValue<>("message1", 2L),
>   new KeyValue<>("message2", 1L),
>   new KeyValue<>("message3", 1L),
>   new KeyValue<>("message4", 1L)
>
> Does that help?
> Michael
>
>
>
>
> On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <ra...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > I am trying to use groupby operator in simple example. However, I get
> > strange results.
> >
> > I have inputs  on "test" topic like: (Long, String)
> > 1    Message_1
> > 1    Message_1
> > 2    Message_2
> > 3    Message_3
> > 4    Message_4
> >
> > I want to get counts of each value. So:
> > Message_1 2
> > Message_1 1
> > Message_2 1
> > Message_3 1
> > Message_4 1
> >
> > Because there is not any operator like groupby (fieldIndex), I assume
> that
> > groupby works always on keys.
> >
> > So, my program is:
> >
> >       KTable<Long, String> source = builder.table(longSerde, stringSerde,
> > "test");
> >       KTable<String,Long> counts =  source.groupBy(new
> KeyValueMapper<Long,
> > String, KeyValue<String, String>>() {
> >
> >         @Override
> >         public KeyValue<String, String> apply(Long key, String value) {
> >             // TODO Auto-generated method stub
> >              return  KeyValue.pair(value, value);
> >         }
> >     },Serdes.String(), Serdes.String()).count("count");
> >       counts.print();;
> >
> > And I get this output as a result:
> >
> > Message_1    1
> > Message_1    0
> > Message_1    1
> > Message_1    0
> > Message_2    1
> > Message_2    0
> > Message_3    1
> > Message_3    0
> > Message_4    1
> > Message_4    0
> >
> > I couldn't  understand this behavior.
> >
> >
> > Cheers
> > Davood
> >
>

Re: Groupby Operator

Posted by Michael Noll <mi...@confluent.io>.
Davood,

you are reading the input topic into a KTable, which means that subsequent
records for the same key (such as the key `1`, which appears twice in the
input messages/records) will be considered as updates to any previous
records for that key.  So I think what you actually want to do is read the
input as a KStream instead of a KTable?

The following code works for me, it looks like what you're trying to do.
Note that I am reading the input data into a KStream, not a KTable.

Input:
  new KeyValue<>(1, "message1"),
  new KeyValue<>(1, "message1"),
  new KeyValue<>(2, "message2"),
  new KeyValue<>(3, "message3"),
  new KeyValue<>(4, "message4")

Streams topology:

  KStream<Integer, String> input = builder.stream(Serdes.Integer(),
Serdes.String(), inputTopic);
  KTable<String, Long> counted = input
      .map((key, value) -> KeyValue.pair(value, value))
      .countByKey(Serdes.String(), "counted");
  counted.to(Serdes.String(), Serdes.Long(), outputTopic);

Output:
  new KeyValue<>("message1", 1L),
  new KeyValue<>("message1", 2L),
  new KeyValue<>("message2", 1L),
  new KeyValue<>("message3", 1L),
  new KeyValue<>("message4", 1L)

Does that help?
Michael




On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <ra...@gmail.com>
wrote:

> Hi,
>
>
> I am trying to use groupby operator in simple example. However, I get
> strange results.
>
> I have inputs  on "test" topic like: (Long, String)
> 1    Message_1
> 1    Message_1
> 2    Message_2
> 3    Message_3
> 4    Message_4
>
> I want to get counts of each value. So:
> Message_1 2
> Message_1 1
> Message_2 1
> Message_3 1
> Message_4 1
>
> Because there is not any operator like groupby (fieldIndex), I assume that
> groupby works always on keys.
>
> So, my program is:
>
>       KTable<Long, String> source = builder.table(longSerde, stringSerde,
> "test");
>       KTable<String,Long> counts =  source.groupBy(new KeyValueMapper<Long,
> String, KeyValue<String, String>>() {
>
>         @Override
>         public KeyValue<String, String> apply(Long key, String value) {
>             // TODO Auto-generated method stub
>              return  KeyValue.pair(value, value);
>         }
>     },Serdes.String(), Serdes.String()).count("count");
>       counts.print();;
>
> And I get this output as a result:
>
> Message_1    1
> Message_1    0
> Message_1    1
> Message_1    0
> Message_2    1
> Message_2    0
> Message_3    1
> Message_3    0
> Message_4    1
> Message_4    0
>
> I couldn't  understand this behavior.
>
>
> Cheers
> Davood
>