You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/08/17 13:13:35 UTC

Re: spark streaming 1.3 doubts(force it to not consume anything)

How to create classtag in java ?Also Constructor of DirectKafkaInputDStream
takes Function1 not Function but kafkautils.createDirectStream allows
function.

I have below as overriden DirectKafkaInputDStream.


public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder, byte[][]>{

public CustomDirectKafkaInputDstream(
StreamingContext ssc_,
Map<String, String> kafkaParams,
Map<TopicAndPartition, Object> fromOffsets,
Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
ClassTag<DefaultDecoder> evidence$3,
ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
evidence$2,
evidence$3, evidence$4, evidence$5);
}
@Override
public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
byte[][]>> compute(
Time validTime) {
int processe=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}



To create this stream
I am using
scala.collection.immutable.Map<String, String> scalakafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
String>>conforms());
scala.collection.immutable.Map<TopicAndPartition, Long>
scalaktopicOffsetMap=
JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
Long>>conforms());

scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = new
Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
        ..});
JavaDStream<byte[][]> directKafkaStream = new
CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
kafka.serializer.DefaultDecoder.class,byte[][].class);



How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
how to use Function instead of Function1 ?



On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org> wrote:

> I'm not aware of an existing api per se, but you could create your own
> subclass of the DStream that returns None for compute() under certain
> conditions.
>
>
>
> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi Cody
>>
>> Can you help here if streaming 1.3 has any api for not consuming any
>> message in next few runs?
>>
>> Thanks
>>
>> ---------- Forwarded message ----------
>> From: Shushant Arora <sh...@gmail.com>
>> Date: Wed, Aug 12, 2015 at 11:23 PM
>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>> To: user <us...@spark.apache.org>
>>
>>
>> I Can't make my stream application batch interval to change at run time .
>> Its always fixed and it always creates jobs at specified batch inetval and
>> enqueue them if earleir batch is not finished.
>>
>> My requirement is to process the events and post them to some external
>> server and if external server is down I want to increase the batch time -
>> that is not possible but can I make it not to consume any messages in say
>> next 5 successive runs ?
>>
>>
>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
will try scala.
Only Reason of not using scala is - never worked on that.

On Wed, Aug 19, 2015 at 7:34 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Is there a reason not to just use scala?  It's not a lot of code... and
> it'll be even less code in scala ;)
>
> On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
>> Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;
>>
>> In scala C[T’] is a subclass of C[T] as per
>> https://twitter.github.io/scala_school/type-basics.html
>> but this is not allowed in java.
>>
>> So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream
>> ?
>>
>>
>> On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
>>> generic inheritance is not supported so derived class cannot return
>>>  different genric typed subclass from overriden method.
>>>
>>> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Option is covariant and KafkaRDD is a subclass of RDD
>>>>
>>>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Is it that in scala its allowed for derived class to have any return
>>>>> type ?
>>>>>
>>>>>  And streaming jar is originally created in scala so its allowed for
>>>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>>>> compute method ?
>>>>>
>>>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> looking at source code of
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>>>
>>>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T,
>>>>>> R]] = {
>>>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>>>       context.sparkContext, kafkaParams, currentOffsets,
>>>>>> untilOffsets, messageHandler)
>>>>>>
>>>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>>>     Some(rdd)
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>>>>
>>>>>> So what should  be the return type of custom DStream extends
>>>>>> DirectKafkaInputDStream .
>>>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream
>>>>>>  in normal scenarios and return none in specific scenario.
>>>>>>
>>>>>> And why the same error did not come while extending
>>>>>> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
>>>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>>>> failed?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The superclass method in DStream is defined as returning an
>>>>>>> Option[RDD[T]]
>>>>>>>
>>>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Getting compilation error while overriding compute method of
>>>>>>>> DirectKafkaInputDStream.
>>>>>>>>
>>>>>>>>
>>>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>>>>>> return type
>>>>>>>>
>>>>>>>> [ERROR] found   :
>>>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>>>
>>>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>>>
>>>>>>>>
>>>>>>>> class :
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>>
>>>>>>>> int processed=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> What should be the return type of compute method ? super class is
>>>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>>>>> byte[][]>>  but its expecting
>>>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>>>>>> there something wring with code?
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Look at the definitions of the java-specific
>>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>>>> JavaStreamingContext)
>>>>>>>>>
>>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>>
>>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>>
>>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>>> StreamingContext ssc_,
>>>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>>> messageHandler,
>>>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]>
>>>>>>>>>> evidence$5) {
>>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>>>> evidence$2,
>>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>>> }
>>>>>>>>>> @Override
>>>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>>> Time validTime) {
>>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>>> if((processed==failed)){
>>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>>> return Option.empty();
>>>>>>>>>> }else{
>>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>>
>>>>>>>>>> return super.compute(validTime);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> To create this stream
>>>>>>>>>> I am using
>>>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>>>> String>>conforms());
>>>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>>>> Long>>conforms());
>>>>>>>>>>
>>>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>>>>>         ..});
>>>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>>>> Function1 ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm not aware of an existing api per se, but you could create
>>>>>>>>>>> your own subclass of the DStream that returns None for compute() under
>>>>>>>>>>> certain conditions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Cody
>>>>>>>>>>>>
>>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not
>>>>>>>>>>>> consuming any message in next few runs?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>>>> anything)
>>>>>>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>>>
>>>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Cody Koeninger <co...@koeninger.org>.
Is there a reason not to just use scala?  It's not a lot of code... and
it'll be even less code in scala ;)

On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <sh...@gmail.com>
wrote:

> To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
> Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;
>
> In scala C[T’] is a subclass of C[T] as per
> https://twitter.github.io/scala_school/type-basics.html
> but this is not allowed in java.
>
> So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream
> ?
>
>
> On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <
> shushantarora09@gmail.com> wrote:
>
>> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
>> generic inheritance is not supported so derived class cannot return
>>  different genric typed subclass from overriden method.
>>
>> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Option is covariant and KafkaRDD is a subclass of RDD
>>>
>>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Is it that in scala its allowed for derived class to have any return
>>>> type ?
>>>>
>>>>  And streaming jar is originally created in scala so its allowed for
>>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>>> compute method ?
>>>>
>>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> looking at source code of
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>>
>>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>>>> = {
>>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>>>> messageHandler)
>>>>>
>>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>>     Some(rdd)
>>>>>   }
>>>>>
>>>>>
>>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>>>
>>>>> So what should  be the return type of custom DStream extends
>>>>> DirectKafkaInputDStream .
>>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream
>>>>>  in normal scenarios and return none in specific scenario.
>>>>>
>>>>> And why the same error did not come while extending
>>>>> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
>>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>>> failed?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> The superclass method in DStream is defined as returning an
>>>>>> Option[RDD[T]]
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Getting compilation error while overriding compute method of
>>>>>>> DirectKafkaInputDStream.
>>>>>>>
>>>>>>>
>>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>>>>> return type
>>>>>>>
>>>>>>> [ERROR] found   :
>>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>>
>>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>>
>>>>>>>
>>>>>>> class :
>>>>>>>
>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>
>>>>>>> @Override
>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>>
>>>>>>> int processed=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> What should be the return type of compute method ? super class is
>>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>>>> byte[][]>>  but its expecting
>>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>>>>> there something wring with code?
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Look at the definitions of the java-specific
>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>>> JavaStreamingContext)
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>
>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>
>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>> StreamingContext ssc_,
>>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>> messageHandler,
>>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]>
>>>>>>>>> evidence$5) {
>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>>> evidence$2,
>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>> }
>>>>>>>>> @Override
>>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>> Time validTime) {
>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>> if((processed==failed)){
>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>> return Option.empty();
>>>>>>>>> }else{
>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>
>>>>>>>>> return super.compute(validTime);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> To create this stream
>>>>>>>>> I am using
>>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>>> String>>conforms());
>>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>>> Long>>conforms());
>>>>>>>>>
>>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>>>>         ..});
>>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>>> Function1 ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> I'm not aware of an existing api per se, but you could create
>>>>>>>>>> your own subclass of the DStream that returns None for compute() under
>>>>>>>>>> certain conditions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Cody
>>>>>>>>>>>
>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>>> any message in next few runs?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>>> anything)
>>>>>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>>
>>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;

In scala C[T’] is a subclass of C[T] as per
https://twitter.github.io/scala_school/type-basics.html
but this is not allowed in java.

So is there any workaround to achieve this in java for overriding
DirectKafkaInputDStream
?


On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <sh...@gmail.com>
wrote:

> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
> inheritance is not supported so derived class cannot return  different
> genric typed subclass from overriden method.
>
> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Option is covariant and KafkaRDD is a subclass of RDD
>>
>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Is it that in scala its allowed for derived class to have any return
>>> type ?
>>>
>>>  And streaming jar is originally created in scala so its allowed for
>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>> compute method ?
>>>
>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> looking at source code of
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>
>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>>> = {
>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>>> messageHandler)
>>>>
>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>     Some(rdd)
>>>>   }
>>>>
>>>>
>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>>
>>>> So what should  be the return type of custom DStream extends
>>>> DirectKafkaInputDStream .
>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>>>> normal scenarios and return none in specific scenario.
>>>>
>>>> And why the same error did not come while extending
>>>> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>> failed?
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The superclass method in DStream is defined as returning an
>>>>> Option[RDD[T]]
>>>>>
>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Getting compilation error while overriding compute method of
>>>>>> DirectKafkaInputDStream.
>>>>>>
>>>>>>
>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>>>> return type
>>>>>>
>>>>>> [ERROR] found   :
>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>
>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>
>>>>>>
>>>>>> class :
>>>>>>
>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>
>>>>>> @Override
>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>> Time validTime) {
>>>>>>
>>>>>> int processed=processedCounter.value();
>>>>>> int failed = failedProcessingsCounter.value();
>>>>>> if((processed==failed)){
>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>> return Option.empty();
>>>>>> }else{
>>>>>> System.out.println("starting the stream ");
>>>>>>
>>>>>> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> What should be the return type of compute method ? super class is
>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>>> byte[][]>>  but its expecting
>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>>>> there something wring with code?
>>>>>>
>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Look at the definitions of the java-specific
>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>> JavaStreamingContext)
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>
>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>> StreamingContext ssc_,
>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>> messageHandler,
>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5)
>>>>>>>> {
>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>> evidence$2,
>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>> }
>>>>>>>> @Override
>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>> int processe=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> To create this stream
>>>>>>>> I am using
>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>> String>>conforms());
>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>>> scalaktopicOffsetMap=
>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>> Long>>conforms());
>>>>>>>>
>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>>>         ..});
>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> How to pass classTag to constructor in
>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>> Function1 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>>>>> conditions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody
>>>>>>>>>>
>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>> any message in next few runs?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>> anything)
>>>>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>
>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
inheritance is not supported so derived class cannot return  different
genric typed subclass from overriden method.

On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Option is covariant and KafkaRDD is a subclass of RDD
>
> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Is it that in scala its allowed for derived class to have any return type
>> ?
>>
>>  And streaming jar is originally created in scala so its allowed for
>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>> compute method ?
>>
>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> looking at source code of
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>
>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>> = {
>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>> messageHandler)
>>>
>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>     Some(rdd)
>>>   }
>>>
>>>
>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>
>>> So what should  be the return type of custom DStream extends
>>> DirectKafkaInputDStream .
>>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>>> normal scenarios and return none in specific scenario.
>>>
>>> And why the same error did not come while extending
>>> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>> failed?
>>>
>>>
>>>
>>>
>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> The superclass method in DStream is defined as returning an
>>>> Option[RDD[T]]
>>>>
>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Getting compilation error while overriding compute method of
>>>>> DirectKafkaInputDStream.
>>>>>
>>>>>
>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>>> return type
>>>>>
>>>>> [ERROR] found   :
>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>
>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>
>>>>>
>>>>> class :
>>>>>
>>>>> public class CustomDirectKafkaInputDstream extends
>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>
>>>>> @Override
>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>> compute(
>>>>> Time validTime) {
>>>>>
>>>>> int processed=processedCounter.value();
>>>>> int failed = failedProcessingsCounter.value();
>>>>> if((processed==failed)){
>>>>> System.out.println("backing off since its 100 % failure");
>>>>> return Option.empty();
>>>>> }else{
>>>>> System.out.println("starting the stream ");
>>>>>
>>>>> return super.compute(validTime);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> What should be the return type of compute method ? super class is
>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>>  but its expecting
>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>>> there something wring with code?
>>>>>
>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Look at the definitions of the java-specific
>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>> JavaStreamingContext)
>>>>>>
>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>
>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>
>>>>>>>
>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>
>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>> StreamingContext ssc_,
>>>>>>> Map<String, String> kafkaParams,
>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>> messageHandler,
>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>> evidence$2,
>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>> }
>>>>>>> @Override
>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>> int processe=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> To create this stream
>>>>>>> I am using
>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>> String>>conforms());
>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>> scalaktopicOffsetMap=
>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>> Long>>conforms());
>>>>>>>
>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>>         ..});
>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <cody@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>>>> conditions.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody
>>>>>>>>>
>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>> any message in next few runs?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>> anything)
>>>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>
>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Cody Koeninger <co...@koeninger.org>.
Option is covariant and KafkaRDD is a subclass of RDD

On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Is it that in scala its allowed for derived class to have any return type ?
>
>  And streaming jar is originally created in scala so its allowed for
> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
> compute method ?
>
> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> looking at source code of
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>
>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] =
>> {
>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>     val rdd = KafkaRDD[K, V, U, T, R](
>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>> messageHandler)
>>
>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>     Some(rdd)
>>   }
>>
>>
>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>
>> So what should  be the return type of custom DStream extends
>> DirectKafkaInputDStream .
>> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
>> normal scenarios and return none in specific scenario.
>>
>> And why the same error did not come while extending
>> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>> failed?
>>
>>
>>
>>
>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> The superclass method in DStream is defined as returning an
>>> Option[RDD[T]]
>>>
>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Getting compilation error while overriding compute method of
>>>> DirectKafkaInputDStream.
>>>>
>>>>
>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>>> return type
>>>>
>>>> [ERROR] found   :
>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>
>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>
>>>>
>>>> class :
>>>>
>>>> public class CustomDirectKafkaInputDstream extends
>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>
>>>> @Override
>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>> compute(
>>>> Time validTime) {
>>>>
>>>> int processed=processedCounter.value();
>>>> int failed = failedProcessingsCounter.value();
>>>> if((processed==failed)){
>>>> System.out.println("backing off since its 100 % failure");
>>>> return Option.empty();
>>>> }else{
>>>> System.out.println("starting the stream ");
>>>>
>>>> return super.compute(validTime);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> What should be the return type of compute method ? super class is
>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>>  but its expecting
>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>>> there something wring with code?
>>>>
>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Look at the definitions of the java-specific
>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>> JavaStreamingContext)
>>>>>
>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> How to create classtag in java ?Also Constructor
>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>> kafkautils.createDirectStream allows function.
>>>>>>
>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>
>>>>>>
>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>
>>>>>> public CustomDirectKafkaInputDstream(
>>>>>> StreamingContext ssc_,
>>>>>> Map<String, String> kafkaParams,
>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>> messageHandler,
>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>> evidence$2,
>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>> }
>>>>>> @Override
>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>> Time validTime) {
>>>>>> int processe=processedCounter.value();
>>>>>> int failed = failedProcessingsCounter.value();
>>>>>> if((processed==failed)){
>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>> return Option.empty();
>>>>>> }else{
>>>>>> System.out.println("starting the stream ");
>>>>>>
>>>>>> return super.compute(validTime);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> To create this stream
>>>>>> I am using
>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>> String>>conforms());
>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>> scalaktopicOffsetMap=
>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>> Long>>conforms());
>>>>>>
>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler
>>>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>>         ..});
>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>
>>>>>>
>>>>>>
>>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream
>>>>>> ? And how to use Function instead of Function1 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>>> conditions.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Cody
>>>>>>>>
>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>> any message in next few runs?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> ---------- Forwarded message ----------
>>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>> anything)
>>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>>
>>>>>>>>
>>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>
>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>>> in say next 5 successive runs ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
Is it that in scala its allowed for derived class to have any return type ?

 And streaming jar is originally created in scala so its allowed for
DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
compute method ?

On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <sh...@gmail.com>
wrote:

> looking at source code of
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>
> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>     val rdd = KafkaRDD[K, V, U, T, R](
>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
> messageHandler)
>
>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>     Some(rdd)
>   }
>
>
> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>
> So what should  be the return type of custom DStream extends
> DirectKafkaInputDStream .
> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
> normal scenarios and return none in specific scenario.
>
> And why the same error did not come while extending
> DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K,
> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
> failed?
>
>
>
>
> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> The superclass method in DStream is defined as returning an Option[RDD[T]]
>>
>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Getting compilation error while overriding compute method of
>>> DirectKafkaInputDStream.
>>>
>>>
>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>> cannot override compute(org.apache.spark.streaming.Time) in
>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>> return type
>>>
>>> [ERROR] found   :
>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>
>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>
>>>
>>> class :
>>>
>>> public class CustomDirectKafkaInputDstream extends
>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>
>>> @Override
>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>> compute(
>>> Time validTime) {
>>>
>>> int processed=processedCounter.value();
>>> int failed = failedProcessingsCounter.value();
>>> if((processed==failed)){
>>> System.out.println("backing off since its 100 % failure");
>>> return Option.empty();
>>> }else{
>>> System.out.println("starting the stream ");
>>>
>>> return super.compute(validTime);
>>> }
>>> }
>>> }
>>>
>>>
>>> What should be the return type of compute method ? super class is
>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>>  but its expecting
>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>> there something wring with code?
>>>
>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Look at the definitions of the java-specific
>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>> JavaStreamingContext)
>>>>
>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> How to create classtag in java ?Also Constructor
>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>> kafkautils.createDirectStream allows function.
>>>>>
>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>
>>>>>
>>>>> public class CustomDirectKafkaInputDstream extends
>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>
>>>>> public CustomDirectKafkaInputDstream(
>>>>> StreamingContext ssc_,
>>>>> Map<String, String> kafkaParams,
>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>> evidence$2,
>>>>> evidence$3, evidence$4, evidence$5);
>>>>> }
>>>>> @Override
>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>> compute(
>>>>> Time validTime) {
>>>>> int processe=processedCounter.value();
>>>>> int failed = failedProcessingsCounter.value();
>>>>> if((processed==failed)){
>>>>> System.out.println("backing off since its 100 % failure");
>>>>> return Option.empty();
>>>>> }else{
>>>>> System.out.println("starting the stream ");
>>>>>
>>>>> return super.compute(validTime);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> To create this stream
>>>>> I am using
>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>> String>>conforms());
>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>> scalaktopicOffsetMap=
>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>> Long>>conforms());
>>>>>
>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler
>>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>         ..});
>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>
>>>>>
>>>>>
>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>>> And how to use Function instead of Function1 ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>> conditions.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody
>>>>>>>
>>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>>> message in next few runs?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> ---------- Forwarded message ----------
>>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>>> To: user <us...@spark.apache.org>
>>>>>>>
>>>>>>>
>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>
>>>>>>> My requirement is to process the events and post them to some
>>>>>>> external server and if external server is down I want to increase the batch
>>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>>> in say next 5 successive runs ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
looking at source code of
org.apache.spark.streaming.kafka.DirectKafkaInputDStream

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
messageHandler)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }


But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

So what should  be the return type of custom DStream extends
DirectKafkaInputDStream .
Since I want the behaviour to be same as of DirectKafkaInputDStream  in
normal scenarios and return none in specific scenario.

And why the same error did not come while extending DirectKafkaInputDStream
from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is
not subclass of Option[RDD[T] so it should have been failed?




On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <co...@koeninger.org> wrote:

> The superclass method in DStream is defined as returning an Option[RDD[T]]
>
> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Getting compilation error while overriding compute method of
>> DirectKafkaInputDStream.
>>
>>
>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>> cannot override compute(org.apache.spark.streaming.Time) in
>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>> return type
>>
>> [ERROR] found   :
>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>
>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>
>>
>> class :
>>
>> public class CustomDirectKafkaInputDstream extends
>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>> kafka.serializer.DefaultDecoder, byte[][]>{
>>
>> @Override
>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>> compute(
>> Time validTime) {
>>
>> int processed=processedCounter.value();
>> int failed = failedProcessingsCounter.value();
>> if((processed==failed)){
>> System.out.println("backing off since its 100 % failure");
>> return Option.empty();
>> }else{
>> System.out.println("starting the stream ");
>>
>> return super.compute(validTime);
>> }
>> }
>> }
>>
>>
>> What should be the return type of compute method ? super class is
>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>>  but its expecting
>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>> there something wring with code?
>>
>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Look at the definitions of the java-specific
>>> KafkaUtils.createDirectStream methods (the ones that take a
>>> JavaStreamingContext)
>>>
>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> How to create classtag in java ?Also Constructor
>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>> kafkautils.createDirectStream allows function.
>>>>
>>>> I have below as overriden DirectKafkaInputDStream.
>>>>
>>>>
>>>> public class CustomDirectKafkaInputDstream extends
>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>
>>>> public CustomDirectKafkaInputDstream(
>>>> StreamingContext ssc_,
>>>> Map<String, String> kafkaParams,
>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>> ClassTag<DefaultDecoder> evidence$3,
>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>> evidence$2,
>>>> evidence$3, evidence$4, evidence$5);
>>>> }
>>>> @Override
>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>> compute(
>>>> Time validTime) {
>>>> int processe=processedCounter.value();
>>>> int failed = failedProcessingsCounter.value();
>>>> if((processed==failed)){
>>>> System.out.println("backing off since its 100 % failure");
>>>> return Option.empty();
>>>> }else{
>>>> System.out.println("starting the stream ");
>>>>
>>>> return super.compute(validTime);
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>> To create this stream
>>>> I am using
>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>> String>>conforms());
>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>> scalaktopicOffsetMap=
>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>> Long>>conforms());
>>>>
>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
>>>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>         ..});
>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>
>>>>
>>>>
>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>> And how to use Function instead of Function1 ?
>>>>
>>>>
>>>>
>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> I'm not aware of an existing api per se, but you could create your own
>>>>> subclass of the DStream that returns None for compute() under certain
>>>>> conditions.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody
>>>>>>
>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>> message in next few runs?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> ---------- Forwarded message ----------
>>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>> To: user <us...@spark.apache.org>
>>>>>>
>>>>>>
>>>>>> I Can't make my stream application batch interval to change at run
>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>
>>>>>> My requirement is to process the events and post them to some
>>>>>> external server and if external server is down I want to increase the batch
>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>> in say next 5 successive runs ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Cody Koeninger <co...@koeninger.org>.
The superclass method in DStream is defined as returning an Option[RDD[T]]

On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Getting compilation error while overriding compute method of
> DirectKafkaInputDStream.
>
>
> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
> cannot override compute(org.apache.spark.streaming.Time) in
> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
> return type
>
> [ERROR] found   :
> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>
> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>
>
> class :
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
> kafka.serializer.DefaultDecoder, byte[][]>{
>
> @Override
> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>> compute(
> Time validTime) {
>
> int processed=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
> }
>
>
> What should be the return type of compute method ? super class is
> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>>  but its expecting
>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
> there something wring with code?
>
> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Look at the definitions of the java-specific
>> KafkaUtils.createDirectStream methods (the ones that take a
>> JavaStreamingContext)
>>
>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> How to create classtag in java ?Also Constructor
>>> of DirectKafkaInputDStream takes Function1 not Function but
>>> kafkautils.createDirectStream allows function.
>>>
>>> I have below as overriden DirectKafkaInputDStream.
>>>
>>>
>>> public class CustomDirectKafkaInputDstream extends
>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>
>>> public CustomDirectKafkaInputDstream(
>>> StreamingContext ssc_,
>>> Map<String, String> kafkaParams,
>>> Map<TopicAndPartition, Object> fromOffsets,
>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>> ClassTag<DefaultDecoder> evidence$3,
>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>> evidence$2,
>>> evidence$3, evidence$4, evidence$5);
>>> }
>>> @Override
>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>> compute(
>>> Time validTime) {
>>> int processe=processedCounter.value();
>>> int failed = failedProcessingsCounter.value();
>>> if((processed==failed)){
>>> System.out.println("backing off since its 100 % failure");
>>> return Option.empty();
>>> }else{
>>> System.out.println("starting the stream ");
>>>
>>> return super.compute(validTime);
>>> }
>>> }
>>>
>>>
>>>
>>> To create this stream
>>> I am using
>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>> String>>conforms());
>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>> scalaktopicOffsetMap=
>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>> Long>>conforms());
>>>
>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
>>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>         ..});
>>> JavaDStream<byte[][]> directKafkaStream = new
>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>
>>>
>>>
>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>> And how to use Function instead of Function1 ?
>>>
>>>
>>>
>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> I'm not aware of an existing api per se, but you could create your own
>>>> subclass of the DStream that returns None for compute() under certain
>>>> conditions.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi Cody
>>>>>
>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>> message in next few runs?
>>>>>
>>>>> Thanks
>>>>>
>>>>> ---------- Forwarded message ----------
>>>>> From: Shushant Arora <sh...@gmail.com>
>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>> To: user <us...@spark.apache.org>
>>>>>
>>>>>
>>>>> I Can't make my stream application batch interval to change at run
>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>
>>>>> My requirement is to process the events and post them to some external
>>>>> server and if external server is down I want to increase the batch time -
>>>>> that is not possible but can I make it not to consume any messages in say
>>>>> next 5 successive runs ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Shushant Arora <sh...@gmail.com>.
Getting compilation error while overriding compute method of
DirectKafkaInputDStream.


[ERROR] CustomDirectKafkaInputDstream.java:[51,83]
compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
cannot override compute(org.apache.spark.streaming.Time) in
org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
return type

[ERROR] found   :
scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>

[ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>


class :

public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder, byte[][]>{

@Override
public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
byte[][]>> compute(
Time validTime) {

int processed=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}
}


What should be the return type of compute method ? super class is returning
Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, byte[][]>>
 but its expecting  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from
derived  class . Is there something wring with code?

On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Look at the definitions of the java-specific KafkaUtils.createDirectStream
> methods (the ones that take a JavaStreamingContext)
>
> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> How to create classtag in java ?Also Constructor
>> of DirectKafkaInputDStream takes Function1 not Function but
>> kafkautils.createDirectStream allows function.
>>
>> I have below as overriden DirectKafkaInputDStream.
>>
>>
>> public class CustomDirectKafkaInputDstream extends
>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>> kafka.serializer.DefaultDecoder, byte[][]>{
>>
>> public CustomDirectKafkaInputDstream(
>> StreamingContext ssc_,
>> Map<String, String> kafkaParams,
>> Map<TopicAndPartition, Object> fromOffsets,
>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>> ClassTag<DefaultDecoder> evidence$3,
>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>> evidence$2,
>> evidence$3, evidence$4, evidence$5);
>> }
>> @Override
>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>> compute(
>> Time validTime) {
>> int processe=processedCounter.value();
>> int failed = failedProcessingsCounter.value();
>> if((processed==failed)){
>> System.out.println("backing off since its 100 % failure");
>> return Option.empty();
>> }else{
>> System.out.println("starting the stream ");
>>
>> return super.compute(validTime);
>> }
>> }
>>
>>
>>
>> To create this stream
>> I am using
>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>> String>>conforms());
>> scala.collection.immutable.Map<TopicAndPartition, Long>
>> scalaktopicOffsetMap=
>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>> Long>>conforms());
>>
>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>         ..});
>> JavaDStream<byte[][]> directKafkaStream = new
>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>
>>
>>
>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>> And how to use Function instead of Function1 ?
>>
>>
>>
>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> I'm not aware of an existing api per se, but you could create your own
>>> subclass of the DStream that returns None for compute() under certain
>>> conditions.
>>>
>>>
>>>
>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi Cody
>>>>
>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>> message in next few runs?
>>>>
>>>> Thanks
>>>>
>>>> ---------- Forwarded message ----------
>>>> From: Shushant Arora <sh...@gmail.com>
>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>> To: user <us...@spark.apache.org>
>>>>
>>>>
>>>> I Can't make my stream application batch interval to change at run time
>>>> . Its always fixed and it always creates jobs at specified batch inetval
>>>> and enqueue them if earleir batch is not finished.
>>>>
>>>> My requirement is to process the events and post them to some external
>>>> server and if external server is down I want to increase the batch time -
>>>> that is not possible but can I make it not to consume any messages in say
>>>> next 5 successive runs ?
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 doubts(force it to not consume anything)

Posted by Cody Koeninger <co...@koeninger.org>.
Look at the definitions of the java-specific KafkaUtils.createDirectStream
methods (the ones that take a JavaStreamingContext)

On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <sh...@gmail.com>
wrote:

> How to create classtag in java ?Also Constructor
> of DirectKafkaInputDStream takes Function1 not Function but
> kafkautils.createDirectStream allows function.
>
> I have below as overriden DirectKafkaInputDStream.
>
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
> kafka.serializer.DefaultDecoder, byte[][]>{
>
> public CustomDirectKafkaInputDstream(
> StreamingContext ssc_,
> Map<String, String> kafkaParams,
> Map<TopicAndPartition, Object> fromOffsets,
> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
> ClassTag<DefaultDecoder> evidence$3,
> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
> evidence$2,
> evidence$3, evidence$4, evidence$5);
> }
> @Override
> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>> compute(
> Time validTime) {
> int processe=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
>
>
>
> To create this stream
> I am using
> scala.collection.immutable.Map<String, String> scalakafkaParams =
> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
> String>>conforms());
> scala.collection.immutable.Map<TopicAndPartition, Long>
> scalaktopicOffsetMap=
> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
> Long>>conforms());
>
> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>         ..});
> JavaDStream<byte[][]> directKafkaStream = new
> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
> kafka.serializer.DefaultDecoder.class,byte[][].class);
>
>
>
> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
> how to use Function instead of Function1 ?
>
>
>
> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> I'm not aware of an existing api per se, but you could create your own
>> subclass of the DStream that returns None for compute() under certain
>> conditions.
>>
>>
>>
>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi Cody
>>>
>>> Can you help here if streaming 1.3 has any api for not consuming any
>>> message in next few runs?
>>>
>>> Thanks
>>>
>>> ---------- Forwarded message ----------
>>> From: Shushant Arora <sh...@gmail.com>
>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>> To: user <us...@spark.apache.org>
>>>
>>>
>>> I Can't make my stream application batch interval to change at run time
>>> . Its always fixed and it always creates jobs at specified batch inetval
>>> and enqueue them if earleir batch is not finished.
>>>
>>> My requirement is to process the events and post them to some external
>>> server and if external server is down I want to increase the batch time -
>>> that is not possible but can I make it not to consume any messages in say
>>> next 5 successive runs ?
>>>
>>>
>>>
>>>
>>
>