You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marco Abitabile <ma...@gmail.com> on 2017/03/27 11:01:48 UTC

[Kafka Streams] - problem joining 2 different streams

Hi all,

I'm struggling with an apparently simple problem.
I'm joining 2 different streams:

Stream1. User activity data,  with key, value --> <String, JsonObject>
Stream2. User location data (such as the city name) with key, value -->
<String, String>

Keys are homogeneous in content and represents the id of the user's device.

The error thrown is:
Exception in thread "StreamThread-2" java.lang.ClassCastException:
com.mytest.JsonObject cannot be cast to  java.lang.String
at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

This is the code I'm running:

//Other Stream: User Location, is a string with the name of the city the
user is (like "San Francisco")
KStreamBuilder locationStreamBuilder = new KStreamBuilder();
KStream<String, String> userLocationStream =
locationStreamBuilder.stream(stringSerde, stringSerde,
"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream.
map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");
KafkaStreams userLocationKafkaStream = new
KafkaStreams(locationStreamBuilder, propsLocation);
userLocationKafkaStream.start();

//This Stream: User Activity
KStreamBuilder activityStreamBuilder = new KStreamBuilder();
KStream<String, JsonObject> activity =
activityStreamBuilder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
.map(MyStreamUtils::enhanceWithScoreDetails)
.join(
    locationKstream,
    MyStreamUtils::locationActivityJoiner,
    JoinWindows.of(1000).until(1000 * 60 * 5),
    stringSerde, jsonSerde, stringSerde)
.to("usersWithLocation")

KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
userActivityStream.start();

And MyStreamUtils::locationActivityJoiner does:

public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
    JsonObject join = activity.copy();
    join.put("city" , loc);
    return join;
}


Basically it seems that  locationActivityJoiner  receives either as right
and left, elements that belongs only from activity  KStream, while I was
expecting to receive an activity (a JsonObject object) and a userLocation (a
String object) element.

how is this possible? I can't get where I'm doing wrong.
Do you have any clue on why this is happenings?

thanks a lot for your support and work.

Best
Marco

Re: [Kafka Streams] - problem joining 2 different streams

Posted by Damian Guy <da...@gmail.com>.
Hi Marco,

It looks like you are creating 2 independent instances of KafkaStreams and
trying to join across those instances. This wouldn't work and i'm surprised
it has let you get that far without some other exception.

You should remove this bit:
>KafkaStreams userLocationKafkaStream = new
>KafkaStreams(locationStreamBuilder, propsLocation);
>userLocationKafkaStream.start();
>
>//This Stream: User Activity
>KStreamBuilder activityStreamBuilder = new KStreamBuilder();

and build the input streams you want to join from the same builder, i.e.,
the original builder that you created. You then just start one instance of
KafkaStreams.

HTH,
Damian




On Mon, 27 Mar 2017 at 14:55 Marco Abitabile <ma...@gmail.com>
wrote:

> Hi all,
>
> I'm struggling with an apparently simple problem.
> I'm joining 2 different streams:
>
> Stream1. User activity data,  with key, value --> <String, JsonObject>
> Stream2. User location data (such as the city name) with key, value -->
> <String, String>
>
> Keys are homogeneous in content and represents the id of the user's device.
>
> The error thrown is:
> Exception in thread "StreamThread-2" java.lang.ClassCastException:
> com.mytest.JsonObject cannot be cast to  java.lang.String
> at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
> at
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>
> This is the code I'm running:
>
> //Other Stream: User Location, is a string with the name of the city the
> user is (like "San Francisco")
> KStreamBuilder locationStreamBuilder = new KStreamBuilder();
> KStream<String, String> userLocationStream =
> locationStreamBuilder.stream(stringSerde, stringSerde,
> "userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream.
> map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> KafkaStreams userLocationKafkaStream = new
> KafkaStreams(locationStreamBuilder, propsLocation);
> userLocationKafkaStream.start();
>
> //This Stream: User Activity
> KStreamBuilder activityStreamBuilder = new KStreamBuilder();
> KStream<String, JsonObject> activity =
> activityStreamBuilder.stream(stringSerde,
> jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
>     locationKstream,
>     MyStreamUtils::locationActivityJoiner,
>     JoinWindows.of(1000).until(1000 * 60 * 5),
>     stringSerde, jsonSerde, stringSerde)
> .to("usersWithLocation")
>
> KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
> userActivityStream.start();
>
> And MyStreamUtils::locationActivityJoiner does:
>
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
>
>
> Basically it seems that  locationActivityJoiner  receives either as right
> and left, elements that belongs only from activity  KStream, while I was
> expecting to receive an activity (a JsonObject object) and a userLocation
> (a
> String object) element.
>
> how is this possible? I can't get where I'm doing wrong.
> Do you have any clue on why this is happenings?
>
> thanks a lot for your support and work.
>
> Best
> Marco
>