You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/08/13 16:50:00 UTC

[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

    [ https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398781#comment-17398781 ] 

Matthias J. Sax commented on KAFKA-10475:
-----------------------------------------

In the end `groupBy(...)` and `groupByKey()` are not 100% equivalent, but `groupBy()` is equivalent to `selectKey(...).groupByKey()`.

The difference between `groupBy()` and `groupByKey()` (assuming that you effectively not changing the key) is still the repartition topic. Thus, data might actually be partitioned differently in the repartition topic than in the input topic, as we don't know how data is partitioned in the input topic. Did you verify that the partitioning did not change? Also, if you rerun the program multiple times, does you always lose the same record and can you identify them individually?

> Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10475
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10475
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>         Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>            Reporter: Saad Rasool
>            Assignee: Divya Guduru
>            Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing when we use custom groupByKey() values. We have a single processor node, with a source topic from which we read packets, do a grouping and aggregation on that group, and output based on a computation that requires access to a statestore.
>  
> Let me give greater details of the problem and how we have tried to understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to perform windowed operations. We are grouping devices based on a specific key. Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the same key When specifying the key ourselves using groupBy() Using groupByKey() to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to group the devices using the key above. In this case, the streams application dropped several records and produced less number of records than expected. However, when we did not specify our own custom grouping using the groupBy() function, and instead used groupByKey() to key the data on the original incoming Kafka partition key, we got the exact number of records which were expected.
> To check that we were using the exact same keys as the input topic for our custom groupBy() function we compared both Keys within the code. The Input topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal functionality of the groupBy function that we are not able to understand because of which the groupBy function and the groupByKey function both report different counts for the same key. We have searched multiple forums but are unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod))) .reduce((value1, value2) -> value2) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .transform(new myTransformer(this.store.name(), this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod))) .reduce((value1, value2) -> value2) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .transform(new myTransformer(this.store.name()), this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)