You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Spark Enthusiast <sp...@yahoo.in> on 2015/08/23 17:56:12 UTC

How to parse multiple event types using Kafka

Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStream<String, String> inputDStream() {

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);

    return KafkaUtils.createDirectStream(
            streamingContext,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );

}

I catch the messages using :JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});
My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, ..... ?
I tried the following:
private class ParseEvents<T> implements Function<String, T> {
    final Class<T> parameterClass;

    private ParseEvents(Class<T> parameterClass) {
        this.parameterClass = parameterClass;
    }

    public T call(String message) throws Exception {
        ObjectMapper mapper = new ObjectMapper();

        T parsedMessage = null;

            try {
                parsedMessage = mapper.readValue(message, this.parameterClass);
            } catch (Exception e1) {
                logger.error("Ignoring Unknown Message %s", message);
              
            }
        return parsedMessage;
    }
}JavaDStream<Type1> type1Events = messages.map(new ParseEvents<Type1>(Type1.class));JavaDStream<Type2> type2Events = messages.map(new ParseEvents<Type2>(Type2.class));JavaDStream<Type3> type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?



Re: How to parse multiple event types using Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
Each spark partition will contain messages only from a single kafka
topcipartition.  Use hasOffsetRanges to tell which kafka topicpartition
it's from.  See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast <sparkenthusiast@yahoo.in
> wrote:

> Folks,
>
> I use the following Streaming API from KafkaUtils :
>
> public JavaPairInputDStream<String, String> inputDStream() {
>
>     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
>     HashMap<String, String> kafkaParams = new HashMap<String, String>();
>     kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);
>
>     return KafkaUtils.createDirectStream(
>             streamingContext,
>             String.class,
>             String.class,
>             StringDecoder.class,
>             StringDecoder.class,
>             kafkaParams,
>             topicsSet
>     );
>
> }
>
>
> I catch the messages using :
>
> JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, String>, String>() {
>     @Override
>     public String call(Tuple2<String, String> tuple2) {
>         return tuple2._2();
>     }
> });
>
>
> My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, ..... ?
>
>
> I tried the following:
>
>
> private class ParseEvents<T> implements Function<String, T> {
>     final Class<T> parameterClass;
>
>     private ParseEvents(Class<T> parameterClass) {
>         this.parameterClass = parameterClass;
>     }
>
>     public T call(String message) throws Exception {
>         ObjectMapper mapper = new ObjectMapper();
>
>         T parsedMessage = null;
>
>             try {
>                 parsedMessage = mapper.readValue(message, this.parameterClass);
>             } catch (Exception e1) {
>                 logger.error("Ignoring Unknown Message %s", message);
>
>             }
>         return parsedMessage;
>     }
> }
>
> JavaDStream<Type1> type1Events = messages.map(new ParseEvents<Type1>(Type1.class));
>
> JavaDStream<Type2> type2Events = messages.map(new ParseEvents<Type2>(Type2.class));
>
> JavaDStream<Type3> type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
>
>
> But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
>
>
>
>
>