You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2016/11/11 21:04:58 UTC
[jira] [Updated] (KAFKA-4270) ClassCast for Agregation
[ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-4270:
---------------------------------
Priority: Critical (was: Major)
> ClassCast for Agregation
> ------------------------
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Mykola Polonskyi
> Priority: Critical
> Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
> Instead defined serde inside aggregation call is used default Impl with empty plugs instead of implementations
> {code:koltin}
> userTable.join(
> skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf<SkicardInfo>() },
> { ownerId, skicardInfo, accumulator -> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator -> accumulator },
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting up should be changed because that is incorrect in release 0.10.0.1-cp1 to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)