You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Spark Enthusiast <sp...@yahoo.in> on 2015/08/23 17:56:12 UTC
How to parse multiple event types using Kafka
Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStream<String, String> inputDStream() {
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);
return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
}
I catch the messages using :JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, ..... ?
I tried the following:
private class ParseEvents<T> implements Function<String, T> {
final Class<T> parameterClass;
private ParseEvents(Class<T> parameterClass) {
this.parameterClass = parameterClass;
}
public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();
T parsedMessage = null;
try {
parsedMessage = mapper.readValue(message, this.parameterClass);
} catch (Exception e1) {
logger.error("Ignoring Unknown Message %s", message);
}
return parsedMessage;
}
}JavaDStream<Type1> type1Events = messages.map(new ParseEvents<Type1>(Type1.class));JavaDStream<Type2> type2Events = messages.map(new ParseEvents<Type2>(Type2.class));JavaDStream<Type3> type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
Re: How to parse multiple event types using Kafka
Posted by Cody Koeninger <co...@koeninger.org>.
Each spark partition will contain messages only from a single kafka
topcipartition. Use hasOffsetRanges to tell which kafka topicpartition
it's from. See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast <sparkenthusiast@yahoo.in
> wrote:
> Folks,
>
> I use the following Streaming API from KafkaUtils :
>
> public JavaPairInputDStream<String, String> inputDStream() {
>
> HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);
>
> return KafkaUtils.createDirectStream(
> streamingContext,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicsSet
> );
>
> }
>
>
> I catch the messages using :
>
> JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, String>, String>() {
> @Override
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
>
>
> My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, ..... ?
>
>
> I tried the following:
>
>
> private class ParseEvents<T> implements Function<String, T> {
> final Class<T> parameterClass;
>
> private ParseEvents(Class<T> parameterClass) {
> this.parameterClass = parameterClass;
> }
>
> public T call(String message) throws Exception {
> ObjectMapper mapper = new ObjectMapper();
>
> T parsedMessage = null;
>
> try {
> parsedMessage = mapper.readValue(message, this.parameterClass);
> } catch (Exception e1) {
> logger.error("Ignoring Unknown Message %s", message);
>
> }
> return parsedMessage;
> }
> }
>
> JavaDStream<Type1> type1Events = messages.map(new ParseEvents<Type1>(Type1.class));
>
> JavaDStream<Type2> type2Events = messages.map(new ParseEvents<Type2>(Type2.class));
>
> JavaDStream<Type3> type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
>
>
> But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
>
>
>
>
>