You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohammad Kargar <mk...@phemi.com> on 2017/03/14 19:58:38 UTC
OffsetOutOfRangeException
To work around an out of space issue in a Direct Kafka Streaming
application we create topics with a low retention policy (retention.ms=300000)
which works fine from Kafka perspective. However this results
into OffsetOutOfRangeException in Spark job (red line below). Is there any
configuration in Spark to set to avoid receiving expired messages?
JavaInputDStream<MyTuple> dStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, MyTuple.class,
kafkaParams, fromOffsets, messageHandler);
dStream.foreachRDD(
new VoidFunction<JavaRDD<MyTuple>>() {
@Override
public void call(JavaRDD<MyTuple> rdd) {
final OffsetRange[] offSetRanges =
((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new
VoidFunction<Iterator<MyTuple>>() {
@Override
public void call(Iterator<MyTuple> iterator)
throws Exception {
//init some data
while (*iterator.hasNext()*) {
//handle next Tuple
}
}
//...
}
}
});