You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohammad Kargar <mk...@phemi.com> on 2017/03/14 19:58:38 UTC

OffsetOutOfRangeException

To work around an out of space issue in a Direct Kafka Streaming
application we create topics with a low retention policy (retention.ms=300000)
which works fine from Kafka perspective. However this results
into OffsetOutOfRangeException in Spark job (red line below). Is there any
configuration in Spark to set to avoid receiving expired messages?


        JavaInputDStream<MyTuple> dStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class,
                StringDecoder.class, DefaultDecoder.class, MyTuple.class,
kafkaParams, fromOffsets, messageHandler);

        dStream.foreachRDD(
                new VoidFunction<JavaRDD<MyTuple>>() {
                    @Override
                    public void call(JavaRDD<MyTuple> rdd) {
                        final OffsetRange[] offSetRanges =
((HasOffsetRanges) rdd.rdd()).offsetRanges();

                        rdd.foreachPartition(new
VoidFunction<Iterator<MyTuple>>() {

                            @Override
                            public void call(Iterator<MyTuple> iterator)
throws Exception {
                                //init some data
                                while (*iterator.hasNext()*) {
                                  //handle next Tuple
                                }
                            }
                            //...
                        }
                    }
                });