You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nipun Arora <ni...@gmail.com> on 2017/06/05 14:59:36 UTC

Kafka + Spark Streaming consumer API offsets

I need some clarification for Kafka consumers in Spark or otherwise. I have
the following Kafka Consumer. The consumer is reading from a topic, and I
have a mechanism which blocks the consumer from time to time.

The producer is a separate thread which is continuously sending data. I
want to ensure that the consumer does not drop/not read data sent during
the period when the consumer was "blocked".

*In case the "blocked" part is confusing - we have a modified Spark
scheduler where we take a lock on the scheduler.*

public static JavaDStream<String> getKafkaDStream(String inputTopics,
String broker, int kafkaPort, JavaStreamingContext ssc){
        HashSet<String> inputTopicsSet = new
HashSet<String>(Arrays.asList(inputTopics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", broker + ":" + kafkaPort);

        JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
                ssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                inputTopicsSet
        );

        JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });

        return lines;
    }

Thanks
Nipun