You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nipun Arora <ni...@gmail.com> on 2017/06/05 14:59:36 UTC
Kafka + Spark Streaming consumer API offsets
I need some clarification for Kafka consumers in Spark or otherwise. I have
the following Kafka Consumer. The consumer is reading from a topic, and I
have a mechanism which blocks the consumer from time to time.
The producer is a separate thread which is continuously sending data. I
want to ensure that the consumer does not drop/not read data sent during
the period when the consumer was "blocked".
*In case the "blocked" part is confusing - we have a modified Spark
scheduler where we take a lock on the scheduler.*
public static JavaDStream<String> getKafkaDStream(String inputTopics,
String broker, int kafkaPort, JavaStreamingContext ssc){
HashSet<String> inputTopicsSet = new
HashSet<String>(Arrays.asList(inputTopics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", broker + ":" + kafkaPort);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
inputTopicsSet
);
JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
return lines;
}
Thanks
Nipun