You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bill Bejeck (JIRA)" <ji...@apache.org> on 2019/05/02 18:59:00 UTC

[jira] [Comment Edited] (KAFKA-8317) ClassCastException using KTable.suppress()

    [ https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831864#comment-16831864 ] 

Bill Bejeck edited comment on KAFKA-8317 at 5/2/19 6:58 PM:
------------------------------------------------------------

Hi [~the4thamigo_uk],

Thanks for reporting this. What version are you using?

In the meantime would something like the following suit your needs?
{noformat}
.suppress(untilTimeLimit(ofMillis(<Some Millis value>), maxRecords(<Max Records Num>).emitEarlyWhenFull())){noformat}
-Bill


was (Author: bbejeck):
Hi [~the4thamigo_uk],

Thanks for reporting this. What version are you using?

In the meantime would something like the following work?
{noformat}
.suppress(untilTimeLimit(ofMillis(<Some Millis value>), maxRecords(<Max Records Num>).emitEarlyWhenFull())){noformat}
-Bill

> ClassCastException using KTable.suppress()
> ------------------------------------------
>
>                 Key: KAFKA-8317
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8317
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
>     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>         final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream
>                 .aggregate(lastAggregator, lastAggregator, materialized);
>         final KTable<Windowed<Object>, GenericRecord> suppressedTable = groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>         // write the change-log stream to the topic
>         suppressedTable.toStream((k, v) -> k.key())
>                 .mapValues(joinValueMapper::apply)
>                 .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)