You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/01/10 01:42:58 UTC

[jira] [Commented] (KAFKA-4612) Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"

    [ https://issues.apache.org/jira/browse/KAFKA-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813501#comment-15813501 ] 

Matthias J. Sax commented on KAFKA-4612:
----------------------------------------

Did you configure global default key and value Serdes via {{StreamsConfig}} using parameters {{KEY_SERDE_CLASS_CONFIG}} and {{VALUE_SERDE_CLASS_CONFIG}}? I guess it's not a library issue in the strong sense but just a miss configuration. The problem is, that the data will be repartitioned after {{.selectKey}} by writing the data to a topic, but it does not find the correct Serde (we need to assume that the key type changes in {{.selectKey}} and thus fall back to global default Serdes). Right now, {{.selectKey}} does not allow to specify a new key Serde (what is kinda problem) -- a work around would be to put a {{.through}} after {{.selectKey}} as it allows to specify the required Serdes.

Btw: I think, in your example you can omit {{.selectKey}} as it does not set a new key anyway. (This change itself should actually fix the problem as repartitioning -- that is not necessarily required in your case -- is avoided -- and even if it is required the library know that the data type is String and can use the correct Serde automatically.)

Please verify. (Please do not close the JIRA even if my hints resolve the problem -- we might want to add an overload for {{.selectKey}} that allows to specify a new key Serde.)


> Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4612
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4612
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: Virtual Machine using Debian 8 + Confluent Platform 3.1.1.
>            Reporter: Kurt Ostfeld
>         Attachments: KafkaIsolatedBug.tar.gz
>
>
> I've attached a minimal single source file project that reliably reproduces this issue.
> This project does the following:
> 1) Create test input data. Produces a single random (String,String) record into two diferent topics "topicInput" and "topicTable"
> 2) Creates and runs a Kafka Streams application:
>     val kafkaTable: KTable[String, String] = builder.table(Serdes.String, Serdes.String, "topicTable", "topicTable")
>     val incomingRecords: KStream[String, String] = builder.stream(Serdes.String, Serdes.String, "topicInput")
>     val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, _) => k)
>     val joinedRecords: KStream[String, String] = reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1)
>     joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")
> This reliably generates the following error:
> [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot be cast to [B
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> 	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
> 	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
> 	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The Kafka Streams project is built using "kafka-clients" and "kafka-streams" version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 0.10.1.0. I will obviously try the next Confluent Platform binary when it is available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)