You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Andrew (JIRA)" <ji...@apache.org> on 2019/05/02 16:58:00 UTC

[jira] [Created] (KAFKA-8317) ClassCastException using KTable.suppress()

Andrew created KAFKA-8317:
-----------------------------

             Summary: ClassCastException using KTable.suppress()
                 Key: KAFKA-8317
                 URL: https://issues.apache.org/jira/browse/KAFKA-8317
             Project: Kafka
          Issue Type: Bug
            Reporter: Andrew


I am trying to use `KTable.suppress()` and I am getting the following error :

{Code}
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
{Code}

My code is as follows :

{Code}
        final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream
                .aggregate(lastAggregator, lastAggregator, materialized);

        final KTable<Windowed<Object>, GenericRecord> suppressedTable = groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

        // write the change-log stream to the topic
        suppressedTable.toStream((k, v) -> k.key())
                .mapValues(joinValueMapper::apply)
                .to(props.joinTopic());
{Code}


The code without using `suppressedTable` works... what am i doing wrong.


Someone else has encountered the same issue :

https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380


Slack conversation : https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)