You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Kurt Ostfeld (JIRA)" <ji...@apache.org> on 2017/01/10 00:54:58 UTC

[jira] [Created] (KAFKA-4612) Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"

Kurt Ostfeld created KAFKA-4612:
-----------------------------------

             Summary: Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"
                 Key: KAFKA-4612
                 URL: https://issues.apache.org/jira/browse/KAFKA-4612
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.1.1
         Environment: Virtual Machine using Debian 8 + Confluent Platform 3.1.1.
            Reporter: Kurt Ostfeld
         Attachments: KafkaIsolatedBug.tar.gz

I've attached a minimal single source file project that reliably reproduces this issue.

This project does the following:

1) Create test input data. Produces a single random (String,String) record into two diferent topics "topicInput" and "topicTable"

2) Creates and runs a Kafka Streams application:

    val kafkaTable: KTable[String, String] = builder.table(Serdes.String, Serdes.String, "topicTable", "topicTable")
    val incomingRecords: KStream[String, String] = builder.stream(Serdes.String, Serdes.String, "topicInput")
    val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, _) => k)
    val joinedRecords: KStream[String, String] = reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1)
    joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")

This reliably generates the following error:

[error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot be cast to [B
java.lang.ClassCastException: java.lang.String cannot be cast to [B
	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
	at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
	at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The Kafka Streams project is built using "kafka-clients" and "kafka-streams" version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 0.10.1.0. I will obviously try the next Confluent Platform binary when it is available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)