You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Damian Guy (JIRA)" <ji...@apache.org> on 2016/11/18 22:41:58 UTC
[jira] [Comment Edited] (KAFKA-4270) ClassCast for Agregation
[ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677990#comment-15677990 ]
Damian Guy edited comment on KAFKA-4270 at 11/18/16 10:41 PM:
--------------------------------------------------------------
Hi Mykola,
I'm not entirely sure i follow what you are saying. I think that all you need to do is provide the serdes to the {{groupBy}} method, i.e.,
{{table.groupBy((key, value) -> value.thing, keySerde, valueSerde)}}
Thanks,
Damian
was (Author: damianguy):
Hi Mykola,
I'm not entirely sure i follow what you are saying. I think that all you need to do is provide the serdes to the `groupBy` method, i.e.,
`table.groupBy((key, value) -> value.thing, keySerde, valueSerde)`
Thanks,
Damian
> ClassCast for Agregation
> ------------------------
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Mykola Polonskyi
> Assignee: Damian Guy
> Priority: Critical
> Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
> Instead defined serde inside aggregation call is used default Impl with empty plugs instead of implementations
> {code:koltin}
> userTable.join(
> skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf<SkicardInfo>() },
> { ownerId, skicardInfo, accumulator -> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator -> accumulator },
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting up should be changed because that is incorrect in release 0.10.0.1-cp1 to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)