You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhishek Anand <ab...@gmail.com> on 2016/02/11 21:40:38 UTC

Stateful Operation on JavaPairDStream Help Needed !!

Hi All,

I have an use case like follows in my production environment where I am
listening from kafka with slideInterval of 1 min and windowLength of 2
hours.

I have a JavaPairDStream where for each key I am getting the same key but
with different value,which might appear in the same batch or some next
batch.

When the key appears second time I need to update a field in value of
previous key with a field in the later key. The keys for which the
combination keys do not come should be rejected after 2 hours.

At the end of each second I need to output the result to external database.

For example :

Suppose valueX is object of MyClass with fields int a, String b
At t=1sec I am getting
key0,value0(0,"prev0")
key1,value1 (1, "prev1")
key2,value2 (2,"prev2")
key2,value3 (3, "next2")

Output to database after 1 sec
key2, newValue (2,"next2")

At t=2 sec getting
key3,value4(4,"prev3")
key1,value5(5,"next1")

Output to database after 2 sec
key1,newValue(1,"next1")

At t=3 sec
key4,value6(6,"prev4")
key3,value7(7,"next3")
key5,value5(8,"prev5")
key5,value5(9,"next5")
key0,value0(10,"next0")

Output to database after 3 sec
key0,newValue(0,"next0")
key3,newValue(4,"next3")
key5,newValue(8,"next5")


Please suggest how this can be achieved.


Thanks a lot !!!!
Abhi

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you post the screenshot of the Streaming DAG and also the driver log?
It would be great if you have a simple producer for us to debug.

On Mon, Feb 29, 2016 at 1:39 AM, Abhishek Anand <ab...@gmail.com>
wrote:

> Hi Ryan,
>
> Its not working even after removing the reduceByKey.
>
> So, basically I am doing the following
> - reading from kafka
> - flatmap inside transform
> - mapWithState
> - rdd.count on output of mapWithState
>
> But to my surprise still dont see checkpointing taking place.
>
> Is there any restriction to the type of operation that we can perform
> inside mapWithState ?
>
> Really need to resolve this one as currently if my application is
> restarted from checkpoint it has to repartition 120 previous stages which
> takes hell lot of time.
>
> Thanks !!
> Abhi
>
> On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Sorry that I forgot to tell you that you should also call `rdd.count()`
>> for "reduceByKey" as well. Could you try it and see if it works?
>>
>> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <ab...@gmail.com>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> I am using mapWithState after doing reduceByKey.
>>>
>>> I am right now using mapWithState as you suggested and triggering the
>>> count manually.
>>>
>>> But, still unable to see any checkpointing taking place. In the DAG I
>>> can see that the reduceByKey operation for the previous batches are also
>>> being computed.
>>>
>>>
>>> Thanks
>>> Abhi
>>>
>>>
>>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>>
>>>>     JavaMapWithStateDStream<...> stateDStream =
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>>>     stateDStream.foreachRDD(new Function1<...>() {
>>>>       @Override
>>>>       public Void call(JavaRDD<...> rdd) throws Exception {
>>>>         rdd.count();
>>>>       }
>>>>     });
>>>>     return stateDStream.stateSnapshots();
>>>>
>>>>
>>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>>> abhis.anan007@gmail.com> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> Reposting the code.
>>>>>
>>>>> Basically my use case is something like - I am receiving the web
>>>>> impression logs and may get the notify (listening from kafka) for those
>>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>>> need to swap the date field in impression with the date field in notify
>>>>> logs. The notify for an impression has the same key as impression.
>>>>>
>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>>() {
>>>>> @Override
>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>>> State<MyClass> state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotifyLog(null);
>>>>> MyClass current = one.or(nullObj);
>>>>>
>>>>> if(current!= null && current.getImprLog() != null &&
>>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>> == 3 /*notify for the impression received*/){
>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>  //swappping the dates
>>>>> return new Tuple2<>(key, oldState);
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>>
>>>>> }
>>>>> };
>>>>>
>>>>>
>>>>> return
>>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>>
>>>>>
>>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>>> and I am able to get the correct data. But, issue the might arise is when I
>>>>> have to restart my application from checkpoint and it repartitions and
>>>>> computes the previous 120 partitions, which delays the incoming batches.
>>>>>
>>>>>
>>>>> Thanks !!
>>>>> Abhi
>>>>>
>>>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> Hey Abhi,
>>>>>>
>>>>>> Could you post how you use mapWithState? By default, it should do
>>>>>> checkpointing every 10 batches.
>>>>>> However, there is a known issue that prevents mapWithState from
>>>>>> checkpointing in some special cases:
>>>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>>>
>>>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>
>>>>>>> Any Insights on this one ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks !!!
>>>>>>> Abhi
>>>>>>>
>>>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am now trying to use mapWithState in the following way using some
>>>>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>>>>> the state and when restarting the application from checkpoint, it
>>>>>>>> re-partitions all the previous batches data from kafka.
>>>>>>>>
>>>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>>> Tuple2<String, MyClass>>() {
>>>>>>>> @Override
>>>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
>>>>>>>> one, State<MyClass> state) {
>>>>>>>> MyClass nullObj = new MyClass();
>>>>>>>> nullObj.setImprLog(null);
>>>>>>>> nullObj.setNotifyLog(null);
>>>>>>>> MyClass current = one.or(nullObj);
>>>>>>>>
>>>>>>>> if(current!= null && current.getImprLog() != null &&
>>>>>>>> current.getMyClassType() == 1){
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>> else if (current.getNotifyLog() != null  &&
>>>>>>>> current.getMyClassType() == 3){
>>>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>>>> return new Tuple2<>(key, oldState);
>>>>>>>> }
>>>>>>>> else{
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> else{
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>>
>>>>>>>> }
>>>>>>>> };
>>>>>>>>
>>>>>>>>
>>>>>>>> Please suggest if this is the proper way or am I doing something
>>>>>>>> wrong.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks !!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> If you don't want to update your only option will be
>>>>>>>>> updateStateByKey then
>>>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> mapWithState supports checkpoint.
>>>>>>>>>>
>>>>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>>>>> e.g.
>>>>>>>>>>   SPARK-12591 NullPointerException using checkpointed
>>>>>>>>>> mapWithState with KryoSerializer
>>>>>>>>>>
>>>>>>>>>> which is in the upcoming 1.6.1
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>>>>
>>>>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>>>>
>>>>>>>>>>> Also, to use mapWithState I will need to upgrade my application
>>>>>>>>>>> as I am using version 1.4.0 and mapWithState isnt supported there. Is there
>>>>>>>>>>> any other work around ?
>>>>>>>>>>>
>>>>>>>>>>> Cheers!!
>>>>>>>>>>> Abhi
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the
>>>>>>>>>>>>> same key but with different value,which might appear in the same batch or
>>>>>>>>>>>>> some next batch.
>>>>>>>>>>>>>
>>>>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>>>>> value of previous key with a field in the later key. The keys for which the
>>>>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>>>>
>>>>>>>>>>>>> At the end of each second I need to output the result to
>>>>>>>>>>>>> external database.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example :
>>>>>>>>>>>>>
>>>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>>>>
>>>>>>>>>>>>> At t=2 sec getting
>>>>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>>>>
>>>>>>>>>>>>> At t=3 sec
>>>>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>>>>> Abhi
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
Hi Ryan,

Its not working even after removing the reduceByKey.

So, basically I am doing the following
- reading from kafka
- flatmap inside transform
- mapWithState
- rdd.count on output of mapWithState

But to my surprise still dont see checkpointing taking place.

Is there any restriction to the type of operation that we can perform
inside mapWithState ?

Really need to resolve this one as currently if my application is restarted
from checkpoint it has to repartition 120 previous stages which takes hell
lot of time.

Thanks !!
Abhi

On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> Sorry that I forgot to tell you that you should also call `rdd.count()`
> for "reduceByKey" as well. Could you try it and see if it works?
>
> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> I am using mapWithState after doing reduceByKey.
>>
>> I am right now using mapWithState as you suggested and triggering the
>> count manually.
>>
>> But, still unable to see any checkpointing taking place. In the DAG I can
>> see that the reduceByKey operation for the previous batches are also being
>> computed.
>>
>>
>> Thanks
>> Abhi
>>
>>
>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>
>>>     JavaMapWithStateDStream<...> stateDStream =
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>>     stateDStream.foreachRDD(new Function1<...>() {
>>>       @Override
>>>       public Void call(JavaRDD<...> rdd) throws Exception {
>>>         rdd.count();
>>>       }
>>>     });
>>>     return stateDStream.stateSnapshots();
>>>
>>>
>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>> abhis.anan007@gmail.com> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> Reposting the code.
>>>>
>>>> Basically my use case is something like - I am receiving the web
>>>> impression logs and may get the notify (listening from kafka) for those
>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>> need to swap the date field in impression with the date field in notify
>>>> logs. The notify for an impression has the same key as impression.
>>>>
>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>> Tuple2<String, MyClass>> mappingFunc =
>>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>> State<MyClass> state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3 /*notify for the impression received*/){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>  //swappping the dates
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> return
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>
>>>>
>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>> and I am able to get the correct data. But, issue the might arise is when I
>>>> have to restart my application from checkpoint and it repartitions and
>>>> computes the previous 120 partitions, which delays the incoming batches.
>>>>
>>>>
>>>> Thanks !!
>>>> Abhi
>>>>
>>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> Hey Abhi,
>>>>>
>>>>> Could you post how you use mapWithState? By default, it should do
>>>>> checkpointing every 10 batches.
>>>>> However, there is a known issue that prevents mapWithState from
>>>>> checkpointing in some special cases:
>>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>>
>>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>>> abhis.anan007@gmail.com> wrote:
>>>>>
>>>>>> Any Insights on this one ?
>>>>>>
>>>>>>
>>>>>> Thanks !!!
>>>>>> Abhi
>>>>>>
>>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>
>>>>>>> I am now trying to use mapWithState in the following way using some
>>>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>>>> the state and when restarting the application from checkpoint, it
>>>>>>> re-partitions all the previous batches data from kafka.
>>>>>>>
>>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>> Tuple2<String, MyClass>>() {
>>>>>>> @Override
>>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
>>>>>>> one, State<MyClass> state) {
>>>>>>> MyClass nullObj = new MyClass();
>>>>>>> nullObj.setImprLog(null);
>>>>>>> nullObj.setNotifyLog(null);
>>>>>>> MyClass current = one.or(nullObj);
>>>>>>>
>>>>>>> if(current!= null && current.getImprLog() != null &&
>>>>>>> current.getMyClassType() == 1){
>>>>>>> return new Tuple2<>(key, null);
>>>>>>> }
>>>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>>>> == 3){
>>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>>> return new Tuple2<>(key, oldState);
>>>>>>> }
>>>>>>> else{
>>>>>>> return new Tuple2<>(key, null);
>>>>>>> }
>>>>>>> }
>>>>>>> else{
>>>>>>> return new Tuple2<>(key, null);
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>> };
>>>>>>>
>>>>>>>
>>>>>>> Please suggest if this is the proper way or am I doing something
>>>>>>> wrong.
>>>>>>>
>>>>>>>
>>>>>>> Thanks !!
>>>>>>> Abhi
>>>>>>>
>>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>
>>>>>>>> If you don't want to update your only option will be
>>>>>>>> updateStateByKey then
>>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> mapWithState supports checkpoint.
>>>>>>>>>
>>>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>>>> e.g.
>>>>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>>>>> with KryoSerializer
>>>>>>>>>
>>>>>>>>> which is in the upcoming 1.6.1
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>>>
>>>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>>>
>>>>>>>>>> Also, to use mapWithState I will need to upgrade my application
>>>>>>>>>> as I am using version 1.4.0 and mapWithState isnt supported there. Is there
>>>>>>>>>> any other work around ?
>>>>>>>>>>
>>>>>>>>>> Cheers!!
>>>>>>>>>> Abhi
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the
>>>>>>>>>>>> same key but with different value,which might appear in the same batch or
>>>>>>>>>>>> some next batch.
>>>>>>>>>>>>
>>>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>>>> value of previous key with a field in the later key. The keys for which the
>>>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>>>
>>>>>>>>>>>> At the end of each second I need to output the result to
>>>>>>>>>>>> external database.
>>>>>>>>>>>>
>>>>>>>>>>>> For example :
>>>>>>>>>>>>
>>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>>>
>>>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>>>
>>>>>>>>>>>> At t=2 sec getting
>>>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>>>
>>>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>>>
>>>>>>>>>>>> At t=3 sec
>>>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>>>
>>>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>>>> Abhi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Sorry that I forgot to tell you that you should also call `rdd.count()` for
"reduceByKey" as well. Could you try it and see if it works?

On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <ab...@gmail.com>
wrote:

> Hi Ryan,
>
> I am using mapWithState after doing reduceByKey.
>
> I am right now using mapWithState as you suggested and triggering the
> count manually.
>
> But, still unable to see any checkpointing taking place. In the DAG I can
> see that the reduceByKey operation for the previous batches are also being
> computed.
>
>
> Thanks
> Abhi
>
>
> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Using reducebykeyandwindow and mapWithState will trigger the bug
>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>
>>     JavaMapWithStateDStream<...> stateDStream =
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>     stateDStream.foreachRDD(new Function1<...>() {
>>       @Override
>>       public Void call(JavaRDD<...> rdd) throws Exception {
>>         rdd.count();
>>       }
>>     });
>>     return stateDStream.stateSnapshots();
>>
>>
>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan007@gmail.com
>> > wrote:
>>
>>> Hi Ryan,
>>>
>>> Reposting the code.
>>>
>>> Basically my use case is something like - I am receiving the web
>>> impression logs and may get the notify (listening from kafka) for those
>>> impressions in the same interval (for me its 1 min) or any next interval
>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>> need to swap the date field in impression with the date field in notify
>>> logs. The notify for an impression has the same key as impression.
>>>
>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>> State<MyClass> state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1 /*this is impression*/){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3 /*notify for the impression received*/){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>  //swappping the dates
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> return
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>
>>>
>>> Currently I am using reducebykeyandwindow without the inverse function
>>> and I am able to get the correct data. But, issue the might arise is when I
>>> have to restart my application from checkpoint and it repartitions and
>>> computes the previous 120 partitions, which delays the incoming batches.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Could you post how you use mapWithState? By default, it should do
>>>> checkpointing every 10 batches.
>>>> However, there is a known issue that prevents mapWithState from
>>>> checkpointing in some special cases:
>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>
>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>> abhis.anan007@gmail.com> wrote:
>>>>
>>>>> Any Insights on this one ?
>>>>>
>>>>>
>>>>> Thanks !!!
>>>>> Abhi
>>>>>
>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>>> abhis.anan007@gmail.com> wrote:
>>>>>
>>>>>> I am now trying to use mapWithState in the following way using some
>>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>>> the state and when restarting the application from checkpoint, it
>>>>>> re-partitions all the previous batches data from kafka.
>>>>>>
>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>> Tuple2<String, MyClass>>() {
>>>>>> @Override
>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
>>>>>> one, State<MyClass> state) {
>>>>>> MyClass nullObj = new MyClass();
>>>>>> nullObj.setImprLog(null);
>>>>>> nullObj.setNotifyLog(null);
>>>>>> MyClass current = one.or(nullObj);
>>>>>>
>>>>>> if(current!= null && current.getImprLog() != null &&
>>>>>> current.getMyClassType() == 1){
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>>> == 3){
>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>> return new Tuple2<>(key, oldState);
>>>>>> }
>>>>>> else{
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>> }
>>>>>> else{
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>>
>>>>>> }
>>>>>> };
>>>>>>
>>>>>>
>>>>>> Please suggest if this is the proper way or am I doing something
>>>>>> wrong.
>>>>>>
>>>>>>
>>>>>> Thanks !!
>>>>>> Abhi
>>>>>>
>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> If you don't want to update your only option will be
>>>>>>> updateStateByKey then
>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> mapWithState supports checkpoint.
>>>>>>>>
>>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>>> e.g.
>>>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>>>> with KryoSerializer
>>>>>>>>
>>>>>>>> which is in the upcoming 1.6.1
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>>
>>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>>
>>>>>>>>> Also, to use mapWithState I will need to upgrade my application as
>>>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is there
>>>>>>>>> any other work around ?
>>>>>>>>>
>>>>>>>>> Cheers!!
>>>>>>>>> Abhi
>>>>>>>>>
>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>>
>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the
>>>>>>>>>>> same key but with different value,which might appear in the same batch or
>>>>>>>>>>> some next batch.
>>>>>>>>>>>
>>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>>> value of previous key with a field in the later key. The keys for which the
>>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>>
>>>>>>>>>>> At the end of each second I need to output the result to
>>>>>>>>>>> external database.
>>>>>>>>>>>
>>>>>>>>>>> For example :
>>>>>>>>>>>
>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>>
>>>>>>>>>>> At t=2 sec getting
>>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>>
>>>>>>>>>>> At t=3 sec
>>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>>> Abhi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
Hi Ryan,

I am using mapWithState after doing reduceByKey.

I am right now using mapWithState as you suggested and triggering the count
manually.

But, still unable to see any checkpointing taking place. In the DAG I can
see that the reduceByKey operation for the previous batches are also being
computed.


Thanks
Abhi


On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> Hey Abhi,
>
> Using reducebykeyandwindow and mapWithState will trigger the bug
> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>
>     JavaMapWithStateDStream<...> stateDStream =
> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>     stateDStream.foreachRDD(new Function1<...>() {
>       @Override
>       public Void call(JavaRDD<...> rdd) throws Exception {
>         rdd.count();
>       }
>     });
>     return stateDStream.stateSnapshots();
>
>
> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> Reposting the code.
>>
>> Basically my use case is something like - I am receiving the web
>> impression logs and may get the notify (listening from kafka) for those
>> impressions in the same interval (for me its 1 min) or any next interval
>> (upto 2 hours). Now, when I receive notify for a particular impression I
>> need to swap the date field in impression with the date field in notify
>> logs. The notify for an impression has the same key as impression.
>>
>> static Function3<String, Optional<MyClass>, State<MyClass>,
>> Tuple2<String, MyClass>> mappingFunc =
>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>> MyClass>>() {
>> @Override
>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>> State<MyClass> state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1 /*this is impression*/){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
>> /*notify for the impression received*/){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>  //swappping the dates
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> return
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>
>>
>> Currently I am using reducebykeyandwindow without the inverse function
>> and I am able to get the correct data. But, issue the might arise is when I
>> have to restart my application from checkpoint and it repartitions and
>> computes the previous 120 partitions, which delays the incoming batches.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Could you post how you use mapWithState? By default, it should do
>>> checkpointing every 10 batches.
>>> However, there is a known issue that prevents mapWithState from
>>> checkpointing in some special cases:
>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>
>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan007@gmail.com
>>> > wrote:
>>>
>>>> Any Insights on this one ?
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>> abhis.anan007@gmail.com> wrote:
>>>>
>>>>> I am now trying to use mapWithState in the following way using some
>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>> the state and when restarting the application from checkpoint, it
>>>>> re-partitions all the previous batches data from kafka.
>>>>>
>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>>() {
>>>>> @Override
>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>>> State<MyClass> state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotifyLog(null);
>>>>> MyClass current = one.or(nullObj);
>>>>>
>>>>> if(current!= null && current.getImprLog() != null &&
>>>>> current.getMyClassType() == 1){
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>> == 3){
>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>> return new Tuple2<>(key, oldState);
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>>
>>>>> }
>>>>> };
>>>>>
>>>>>
>>>>> Please suggest if this is the proper way or am I doing something wrong.
>>>>>
>>>>>
>>>>> Thanks !!
>>>>> Abhi
>>>>>
>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>> sebastian.piu@gmail.com> wrote:
>>>>>
>>>>>> If you don't want to update your only option will be updateStateByKey
>>>>>> then
>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> mapWithState supports checkpoint.
>>>>>>>
>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>> e.g.
>>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>>> with KryoSerializer
>>>>>>>
>>>>>>> which is in the upcoming 1.6.1
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>
>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>
>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>
>>>>>>>> Also, to use mapWithState I will need to upgrade my application as
>>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is there
>>>>>>>> any other work around ?
>>>>>>>>
>>>>>>>> Cheers!!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>
>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>>>> key but with different value,which might appear in the same batch or some
>>>>>>>>>> next batch.
>>>>>>>>>>
>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>> value of previous key with a field in the later key. The keys for which the
>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>
>>>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>>>> database.
>>>>>>>>>>
>>>>>>>>>> For example :
>>>>>>>>>>
>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>
>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>
>>>>>>>>>> At t=2 sec getting
>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>
>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>
>>>>>>>>>> At t=3 sec
>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>
>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>> Abhi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey Abhi,

Using reducebykeyandwindow and mapWithState will trigger the bug
in SPARK-6847. Here is a workaround to trigger checkpoint manually:

    JavaMapWithStateDStream<...> stateDStream =
myPairDstream.mapWithState(StateSpec.function(mappingFunc));
    stateDStream.foreachRDD(new Function1<...>() {
      @Override
      public Void call(JavaRDD<...> rdd) throws Exception {
        rdd.count();
      }
    });
    return stateDStream.stateSnapshots();


On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <ab...@gmail.com>
wrote:

> Hi Ryan,
>
> Reposting the code.
>
> Basically my use case is something like - I am receiving the web
> impression logs and may get the notify (listening from kafka) for those
> impressions in the same interval (for me its 1 min) or any next interval
> (upto 2 hours). Now, when I receive notify for a particular impression I
> need to swap the date field in impression with the date field in notify
> logs. The notify for an impression has the same key as impression.
>
> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
> State<MyClass> state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1 /*this is impression*/){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
> /*notify for the impression received*/){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>  //swappping the dates
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> return
> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>
>
> Currently I am using reducebykeyandwindow without the inverse function and
> I am able to get the correct data. But, issue the might arise is when I
> have to restart my application from checkpoint and it repartitions and
> computes the previous 120 partitions, which delays the incoming batches.
>
>
> Thanks !!
> Abhi
>
> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Could you post how you use mapWithState? By default, it should do
>> checkpointing every 10 batches.
>> However, there is a known issue that prevents mapWithState from
>> checkpointing in some special cases:
>> https://issues.apache.org/jira/browse/SPARK-6847
>>
>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <ab...@gmail.com>
>> wrote:
>>
>>> Any Insights on this one ?
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>> abhis.anan007@gmail.com> wrote:
>>>
>>>> I am now trying to use mapWithState in the following way using some
>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>> the state and when restarting the application from checkpoint, it
>>>> re-partitions all the previous batches data from kafka.
>>>>
>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>> Tuple2<String, MyClass>> mappingFunc =
>>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>> State<MyClass> state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> Please suggest if this is the proper way or am I doing something wrong.
>>>>
>>>>
>>>> Thanks !!
>>>> Abhi
>>>>
>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian.piu@gmail.com
>>>> > wrote:
>>>>
>>>>> If you don't want to update your only option will be updateStateByKey
>>>>> then
>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>>
>>>>>> mapWithState supports checkpoint.
>>>>>>
>>>>>> There has been some bug fix since release of 1.6.0
>>>>>> e.g.
>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>> with KryoSerializer
>>>>>>
>>>>>> which is in the upcoming 1.6.1
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>
>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>
>>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>>
>>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>>>>> other work around ?
>>>>>>>
>>>>>>> Cheers!!
>>>>>>> Abhi
>>>>>>>
>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>
>>>>>>>> Looks like mapWithState could help you?
>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have an use case like follows in my production environment where
>>>>>>>>> I am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>>>>> hours.
>>>>>>>>>
>>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>>> key but with different value,which might appear in the same batch or some
>>>>>>>>> next batch.
>>>>>>>>>
>>>>>>>>> When the key appears second time I need to update a field in value
>>>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>
>>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>>> database.
>>>>>>>>>
>>>>>>>>> For example :
>>>>>>>>>
>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>> At t=1sec I am getting
>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>
>>>>>>>>> Output to database after 1 sec
>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>
>>>>>>>>> At t=2 sec getting
>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>
>>>>>>>>> Output to database after 2 sec
>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>
>>>>>>>>> At t=3 sec
>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>
>>>>>>>>> Output to database after 3 sec
>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks a lot !!!!
>>>>>>>>> Abhi
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
Hi Ryan,

Reposting the code.

Basically my use case is something like - I am receiving the web impression
logs and may get the notify (listening from kafka) for those impressions in
the same interval (for me its 1 min) or any next interval (upto 2 hours).
Now, when I receive notify for a particular impression I need to swap the
date field in impression with the date field in notify logs. The notify for
an impression has the same key as impression.

static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
State<MyClass> state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1 /*this is impression*/){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
/*notify for the impression received*/){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
 //swappping the dates
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


return
myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();


Currently I am using reducebykeyandwindow without the inverse function and
I am able to get the correct data. But, issue the might arise is when I
have to restart my application from checkpoint and it repartitions and
computes the previous 120 partitions, which delays the incoming batches.


Thanks !!
Abhi

On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan007@gmail.com
>> > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>> State<MyClass> state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com>
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>> abhis.anan007@gmail.com> wrote:
>>>>>
>>>>>> Does mapWithState checkpoints the data ?
>>>>>>
>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>
>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>>>> other work around ?
>>>>>>
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>>
>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> Looks like mapWithState could help you?
>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have an use case like follows in my production environment where
>>>>>>>> I am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>>>> hours.
>>>>>>>>
>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>> key but with different value,which might appear in the same batch or some
>>>>>>>> next batch.
>>>>>>>>
>>>>>>>> When the key appears second time I need to update a field in value
>>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>
>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>> database.
>>>>>>>>
>>>>>>>> For example :
>>>>>>>>
>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>> At t=1sec I am getting
>>>>>>>> key0,value0(0,"prev0")
>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>
>>>>>>>> Output to database after 1 sec
>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>
>>>>>>>> At t=2 sec getting
>>>>>>>> key3,value4(4,"prev3")
>>>>>>>> key1,value5(5,"next1")
>>>>>>>>
>>>>>>>> Output to database after 2 sec
>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>
>>>>>>>> At t=3 sec
>>>>>>>> key4,value6(6,"prev4")
>>>>>>>> key3,value7(7,"next3")
>>>>>>>> key5,value5(8,"prev5")
>>>>>>>> key5,value5(9,"next5")
>>>>>>>> key0,value0(10,"next0")
>>>>>>>>
>>>>>>>> Output to database after 3 sec
>>>>>>>> key0,newValue(0,"next0")
>>>>>>>> key3,newValue(4,"next3")
>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>
>>>>>>>>
>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks a lot !!!!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey, Ted,

As the fix for SPARK-6847 changes the semantics of Streaming checkpointing,
it doesn't go into branch 1.6.

A workaround is calling `count` to trigger the checkpoint manually. Such as,

val dstream = ... // dstream is an operator needing to be checkpointed.
dstream.foreachRDD(rdd => rdd.count())


On Mon, Feb 22, 2016 at 12:25 PM, Ted Yu <yu...@gmail.com> wrote:

> Fix for SPARK-6847 is not in branch-1.6
>
> Should the fix be ported to branch-1.6 ?
>
> Thanks
>
> On Feb 22, 2016, at 11:55 AM, Shixiong(Ryan) Zhu <sh...@databricks.com>
> wrote:
>
> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan007@gmail.com
>> > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>> State<MyClass> state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com>
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>> abhis.anan007@gmail.com> wrote:
>>>>>
>>>>>> Does mapWithState checkpoints the data ?
>>>>>>
>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>
>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>>>> other work around ?
>>>>>>
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>>
>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> Looks like mapWithState could help you?
>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have an use case like follows in my production environment where
>>>>>>>> I am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>>>> hours.
>>>>>>>>
>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>> key but with different value,which might appear in the same batch or some
>>>>>>>> next batch.
>>>>>>>>
>>>>>>>> When the key appears second time I need to update a field in value
>>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>
>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>> database.
>>>>>>>>
>>>>>>>> For example :
>>>>>>>>
>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>> At t=1sec I am getting
>>>>>>>> key0,value0(0,"prev0")
>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>
>>>>>>>> Output to database after 1 sec
>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>
>>>>>>>> At t=2 sec getting
>>>>>>>> key3,value4(4,"prev3")
>>>>>>>> key1,value5(5,"next1")
>>>>>>>>
>>>>>>>> Output to database after 2 sec
>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>
>>>>>>>> At t=3 sec
>>>>>>>> key4,value6(6,"prev4")
>>>>>>>> key3,value7(7,"next3")
>>>>>>>> key5,value5(8,"prev5")
>>>>>>>> key5,value5(9,"next5")
>>>>>>>> key0,value0(10,"next0")
>>>>>>>>
>>>>>>>> Output to database after 3 sec
>>>>>>>> key0,newValue(0,"next0")
>>>>>>>> key3,newValue(4,"next3")
>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>
>>>>>>>>
>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks a lot !!!!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Ted Yu <yu...@gmail.com>.
Fix for SPARK-6847 is not in branch-1.6

Should the fix be ported to branch-1.6 ?

Thanks

> On Feb 22, 2016, at 11:55 AM, Shixiong(Ryan) Zhu <sh...@databricks.com> wrote:
> 
> Hey Abhi,
> 
> Could you post how you use mapWithState? By default, it should do checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from checkpointing in some special cases: https://issues.apache.org/jira/browse/SPARK-6847
> 
>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <ab...@gmail.com> wrote:
>> Any Insights on this one ?
>> 
>> 
>> Thanks !!!
>> Abhi 
>> 
>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <ab...@gmail.com> wrote:
>>> I am now trying to use mapWithState in the following way using some example codes. But, by looking at the DAG it does not seem to checkpoint the state and when restarting the application from checkpoint, it re-partitions all the previous batches data from kafka.
>>> 
>>> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, MyClass>> mappingFunc =
>>> 			new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, MyClass>>() {
>>> 		@Override
>>> 		public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, State<MyClass> state) {
>>> 			MyClass nullObj = new MyClass();
>>> 			nullObj.setImprLog(null);
>>> 			nullObj.setNotifyLog(null);
>>> 			MyClass current = one.or(nullObj);
>>> 
>>> 			if(current!= null && current.getImprLog() != null && current.getMyClassType() == 1){
>>> 				return new Tuple2<>(key, null);
>>> 			}
>>> 			else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
>>> 				MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> 				if(oldState!= null && oldState.getNotifyLog() != null){
>>> 					oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> 					return new Tuple2<>(key, oldState);
>>> 				}
>>> 				else{
>>> 					return new Tuple2<>(key, null);
>>> 				}
>>> 			}
>>> 			else{
>>> 				return new Tuple2<>(key, null);
>>> 			}
>>> 
>>> 		}
>>> 	};
>>> 
>>> 
>>> Please suggest if this is the proper way or am I doing something wrong.
>>> 
>>> 
>>> Thanks !!
>>> Abhi 
>>> 
>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com> wrote:
>>>> If you don't want to update your only option will be updateStateByKey then
>>>> 
>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>>> mapWithState supports checkpoint.
>>>>> 
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with KryoSerializer
>>>>> 
>>>>> which is in the upcoming 1.6.1
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <ab...@gmail.com> wrote:
>>>>>> Does mapWithState checkpoints the data ?
>>>>>> 
>>>>>> When my application goes down and is restarted from checkpoint, will mapWithState need to recompute the previous batches data ?
>>>>>> 
>>>>>> Also, to use mapWithState I will need to upgrade my application as I am using version 1.4.0 and mapWithState isnt supported there. Is there any other work around ?
>>>>>> 
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>> 
>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <se...@gmail.com> wrote:
>>>>>>> Looks like mapWithState could help you?
>>>>>>> 
>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com> wrote:
>>>>>>>> Hi All,
>>>>>>>> 
>>>>>>>> I have an use case like follows in my production environment where I am listening from kafka with slideInterval of 1 min and windowLength of 2 hours.
>>>>>>>> 
>>>>>>>> I have a JavaPairDStream where for each key I am getting the same key but with different value,which might appear in the same batch or some next batch. 
>>>>>>>> 
>>>>>>>> When the key appears second time I need to update a field in value of previous key with a field in the later key. The keys for which the combination keys do not come should be rejected after 2 hours.
>>>>>>>> 
>>>>>>>> At the end of each second I need to output the result to external database.
>>>>>>>> 
>>>>>>>> For example :
>>>>>>>> 
>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>> At t=1sec I am getting 
>>>>>>>> key0,value0(0,"prev0") 
>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>> key2,value3 (3, "next2")
>>>>>>>> 
>>>>>>>> Output to database after 1 sec 
>>>>>>>> key2, newValue (2,"next2")
>>>>>>>> 
>>>>>>>> At t=2 sec getting 
>>>>>>>> key3,value4(4,"prev3")
>>>>>>>> key1,value5(5,"next1")
>>>>>>>> 
>>>>>>>> Output to database after 2 sec 
>>>>>>>> key1,newValue(1,"next1")
>>>>>>>> 
>>>>>>>> At t=3 sec 
>>>>>>>> key4,value6(6,"prev4")
>>>>>>>> key3,value7(7,"next3")
>>>>>>>> key5,value5(8,"prev5")
>>>>>>>> key5,value5(9,"next5")
>>>>>>>> key0,value0(10,"next0")
>>>>>>>> 
>>>>>>>> Output to database after 3 sec
>>>>>>>> key0,newValue(0,"next0")
>>>>>>>> key3,newValue(4,"next3")
>>>>>>>> key5,newValue(8,"next5")
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Please suggest how this can be achieved.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks a lot !!!!
>>>>>>>> Abhi 
> 

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey Abhi,

Could you post how you use mapWithState? By default, it should do
checkpointing every 10 batches.
However, there is a known issue that prevents mapWithState from
checkpointing in some special cases:
https://issues.apache.org/jira/browse/SPARK-6847

On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <ab...@gmail.com>
wrote:

> Any Insights on this one ?
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> I am now trying to use mapWithState in the following way using some
>> example codes. But, by looking at the DAG it does not seem to checkpoint
>> the state and when restarting the application from checkpoint, it
>> re-partitions all the previous batches data from kafka.
>>
>> static Function3<String, Optional<MyClass>, State<MyClass>,
>> Tuple2<String, MyClass>> mappingFunc =
>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>> MyClass>>() {
>> @Override
>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>> State<MyClass> state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>> 3){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> Please suggest if this is the proper way or am I doing something wrong.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> If you don't want to update your only option will be updateStateByKey
>>> then
>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>>
>>>> mapWithState supports checkpoint.
>>>>
>>>> There has been some bug fix since release of 1.6.0
>>>> e.g.
>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>>> KryoSerializer
>>>>
>>>> which is in the upcoming 1.6.1
>>>>
>>>> Cheers
>>>>
>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>> abhis.anan007@gmail.com> wrote:
>>>>
>>>>> Does mapWithState checkpoints the data ?
>>>>>
>>>>> When my application goes down and is restarted from checkpoint, will
>>>>> mapWithState need to recompute the previous batches data ?
>>>>>
>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>>> other work around ?
>>>>>
>>>>> Cheers!!
>>>>> Abhi
>>>>>
>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>> sebastian.piu@gmail.com> wrote:
>>>>>
>>>>>> Looks like mapWithState could help you?
>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have an use case like follows in my production environment where I
>>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>>> hours.
>>>>>>>
>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>> key but with different value,which might appear in the same batch or some
>>>>>>> next batch.
>>>>>>>
>>>>>>> When the key appears second time I need to update a field in value
>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>
>>>>>>> At the end of each second I need to output the result to external
>>>>>>> database.
>>>>>>>
>>>>>>> For example :
>>>>>>>
>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>> At t=1sec I am getting
>>>>>>> key0,value0(0,"prev0")
>>>>>>> key1,value1 (1, "prev1")
>>>>>>> key2,value2 (2,"prev2")
>>>>>>> key2,value3 (3, "next2")
>>>>>>>
>>>>>>> Output to database after 1 sec
>>>>>>> key2, newValue (2,"next2")
>>>>>>>
>>>>>>> At t=2 sec getting
>>>>>>> key3,value4(4,"prev3")
>>>>>>> key1,value5(5,"next1")
>>>>>>>
>>>>>>> Output to database after 2 sec
>>>>>>> key1,newValue(1,"next1")
>>>>>>>
>>>>>>> At t=3 sec
>>>>>>> key4,value6(6,"prev4")
>>>>>>> key3,value7(7,"next3")
>>>>>>> key5,value5(8,"prev5")
>>>>>>> key5,value5(9,"next5")
>>>>>>> key0,value0(10,"next0")
>>>>>>>
>>>>>>> Output to database after 3 sec
>>>>>>> key0,newValue(0,"next0")
>>>>>>> key3,newValue(4,"next3")
>>>>>>> key5,newValue(8,"next5")
>>>>>>>
>>>>>>>
>>>>>>> Please suggest how this can be achieved.
>>>>>>>
>>>>>>>
>>>>>>> Thanks a lot !!!!
>>>>>>> Abhi
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
Any Insights on this one ?


Thanks !!!
Abhi

On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <ab...@gmail.com>
wrote:

> I am now trying to use mapWithState in the following way using some
> example codes. But, by looking at the DAG it does not seem to checkpoint
> the state and when restarting the application from checkpoint, it
> re-partitions all the previous batches data from kafka.
>
> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
> State<MyClass> state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> Please suggest if this is the proper way or am I doing something wrong.
>
>
> Thanks !!
> Abhi
>
> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com>
> wrote:
>
>> If you don't want to update your only option will be updateStateByKey then
>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>>
>>> mapWithState supports checkpoint.
>>>
>>> There has been some bug fix since release of 1.6.0
>>> e.g.
>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>> KryoSerializer
>>>
>>> which is in the upcoming 1.6.1
>>>
>>> Cheers
>>>
>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>> abhis.anan007@gmail.com> wrote:
>>>
>>>> Does mapWithState checkpoints the data ?
>>>>
>>>> When my application goes down and is restarted from checkpoint, will
>>>> mapWithState need to recompute the previous batches data ?
>>>>
>>>> Also, to use mapWithState I will need to upgrade my application as I am
>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>> other work around ?
>>>>
>>>> Cheers!!
>>>> Abhi
>>>>
>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian.piu@gmail.com
>>>> > wrote:
>>>>
>>>>> Looks like mapWithState could help you?
>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have an use case like follows in my production environment where I
>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>> hours.
>>>>>>
>>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>>> but with different value,which might appear in the same batch or some next
>>>>>> batch.
>>>>>>
>>>>>> When the key appears second time I need to update a field in value of
>>>>>> previous key with a field in the later key. The keys for which the
>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>
>>>>>> At the end of each second I need to output the result to external
>>>>>> database.
>>>>>>
>>>>>> For example :
>>>>>>
>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>> At t=1sec I am getting
>>>>>> key0,value0(0,"prev0")
>>>>>> key1,value1 (1, "prev1")
>>>>>> key2,value2 (2,"prev2")
>>>>>> key2,value3 (3, "next2")
>>>>>>
>>>>>> Output to database after 1 sec
>>>>>> key2, newValue (2,"next2")
>>>>>>
>>>>>> At t=2 sec getting
>>>>>> key3,value4(4,"prev3")
>>>>>> key1,value5(5,"next1")
>>>>>>
>>>>>> Output to database after 2 sec
>>>>>> key1,newValue(1,"next1")
>>>>>>
>>>>>> At t=3 sec
>>>>>> key4,value6(6,"prev4")
>>>>>> key3,value7(7,"next3")
>>>>>> key5,value5(8,"prev5")
>>>>>> key5,value5(9,"next5")
>>>>>> key0,value0(10,"next0")
>>>>>>
>>>>>> Output to database after 3 sec
>>>>>> key0,newValue(0,"next0")
>>>>>> key3,newValue(4,"next3")
>>>>>> key5,newValue(8,"next5")
>>>>>>
>>>>>>
>>>>>> Please suggest how this can be achieved.
>>>>>>
>>>>>>
>>>>>> Thanks a lot !!!!
>>>>>> Abhi
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
I am now trying to use mapWithState in the following way using some example
codes. But, by looking at the DAG it does not seem to checkpoint the state
and when restarting the application from checkpoint, it re-partitions all
the previous batches data from kafka.

static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
State<MyClass> state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


Please suggest if this is the proper way or am I doing something wrong.


Thanks !!
Abhi

On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <se...@gmail.com>
wrote:

> If you don't want to update your only option will be updateStateByKey then
> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:
>
>> mapWithState supports checkpoint.
>>
>> There has been some bug fix since release of 1.6.0
>> e.g.
>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>> KryoSerializer
>>
>> which is in the upcoming 1.6.1
>>
>> Cheers
>>
>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <abhis.anan007@gmail.com
>> > wrote:
>>
>>> Does mapWithState checkpoints the data ?
>>>
>>> When my application goes down and is restarted from checkpoint, will
>>> mapWithState need to recompute the previous batches data ?
>>>
>>> Also, to use mapWithState I will need to upgrade my application as I am
>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>> other work around ?
>>>
>>> Cheers!!
>>> Abhi
>>>
>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <se...@gmail.com>
>>> wrote:
>>>
>>>> Looks like mapWithState could help you?
>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have an use case like follows in my production environment where I
>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>> hours.
>>>>>
>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>> but with different value,which might appear in the same batch or some next
>>>>> batch.
>>>>>
>>>>> When the key appears second time I need to update a field in value of
>>>>> previous key with a field in the later key. The keys for which the
>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>
>>>>> At the end of each second I need to output the result to external
>>>>> database.
>>>>>
>>>>> For example :
>>>>>
>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>> At t=1sec I am getting
>>>>> key0,value0(0,"prev0")
>>>>> key1,value1 (1, "prev1")
>>>>> key2,value2 (2,"prev2")
>>>>> key2,value3 (3, "next2")
>>>>>
>>>>> Output to database after 1 sec
>>>>> key2, newValue (2,"next2")
>>>>>
>>>>> At t=2 sec getting
>>>>> key3,value4(4,"prev3")
>>>>> key1,value5(5,"next1")
>>>>>
>>>>> Output to database after 2 sec
>>>>> key1,newValue(1,"next1")
>>>>>
>>>>> At t=3 sec
>>>>> key4,value6(6,"prev4")
>>>>> key3,value7(7,"next3")
>>>>> key5,value5(8,"prev5")
>>>>> key5,value5(9,"next5")
>>>>> key0,value0(10,"next0")
>>>>>
>>>>> Output to database after 3 sec
>>>>> key0,newValue(0,"next0")
>>>>> key3,newValue(4,"next3")
>>>>> key5,newValue(8,"next5")
>>>>>
>>>>>
>>>>> Please suggest how this can be achieved.
>>>>>
>>>>>
>>>>> Thanks a lot !!!!
>>>>> Abhi
>>>>>
>>>>>
>>>>>
>>>
>>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Sebastian Piu <se...@gmail.com>.
If you don't want to update your only option will be updateStateByKey then
On 13 Feb 2016 8:48 p.m., "Ted Yu" <yu...@gmail.com> wrote:

> mapWithState supports checkpoint.
>
> There has been some bug fix since release of 1.6.0
> e.g.
>   SPARK-12591 NullPointerException using checkpointed mapWithState with
> KryoSerializer
>
> which is in the upcoming 1.6.1
>
> Cheers
>
> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> Does mapWithState checkpoints the data ?
>>
>> When my application goes down and is restarted from checkpoint, will
>> mapWithState need to recompute the previous batches data ?
>>
>> Also, to use mapWithState I will need to upgrade my application as I am
>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>> other work around ?
>>
>> Cheers!!
>> Abhi
>>
>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> Looks like mapWithState could help you?
>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have an use case like follows in my production environment where I am
>>>> listening from kafka with slideInterval of 1 min and windowLength of 2
>>>> hours.
>>>>
>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>> but with different value,which might appear in the same batch or some next
>>>> batch.
>>>>
>>>> When the key appears second time I need to update a field in value of
>>>> previous key with a field in the later key. The keys for which the
>>>> combination keys do not come should be rejected after 2 hours.
>>>>
>>>> At the end of each second I need to output the result to external
>>>> database.
>>>>
>>>> For example :
>>>>
>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>> At t=1sec I am getting
>>>> key0,value0(0,"prev0")
>>>> key1,value1 (1, "prev1")
>>>> key2,value2 (2,"prev2")
>>>> key2,value3 (3, "next2")
>>>>
>>>> Output to database after 1 sec
>>>> key2, newValue (2,"next2")
>>>>
>>>> At t=2 sec getting
>>>> key3,value4(4,"prev3")
>>>> key1,value5(5,"next1")
>>>>
>>>> Output to database after 2 sec
>>>> key1,newValue(1,"next1")
>>>>
>>>> At t=3 sec
>>>> key4,value6(6,"prev4")
>>>> key3,value7(7,"next3")
>>>> key5,value5(8,"prev5")
>>>> key5,value5(9,"next5")
>>>> key0,value0(10,"next0")
>>>>
>>>> Output to database after 3 sec
>>>> key0,newValue(0,"next0")
>>>> key3,newValue(4,"next3")
>>>> key5,newValue(8,"next5")
>>>>
>>>>
>>>> Please suggest how this can be achieved.
>>>>
>>>>
>>>> Thanks a lot !!!!
>>>> Abhi
>>>>
>>>>
>>>>
>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Ted Yu <yu...@gmail.com>.
mapWithState supports checkpoint.

There has been some bug fix since release of 1.6.0
e.g.
  SPARK-12591 NullPointerException using checkpointed mapWithState with
KryoSerializer

which is in the upcoming 1.6.1

Cheers

On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <ab...@gmail.com>
wrote:

> Does mapWithState checkpoints the data ?
>
> When my application goes down and is restarted from checkpoint, will
> mapWithState need to recompute the previous batches data ?
>
> Also, to use mapWithState I will need to upgrade my application as I am
> using version 1.4.0 and mapWithState isnt supported there. Is there any
> other work around ?
>
> Cheers!!
> Abhi
>
> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <se...@gmail.com>
> wrote:
>
>> Looks like mapWithState could help you?
>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I have an use case like follows in my production environment where I am
>>> listening from kafka with slideInterval of 1 min and windowLength of 2
>>> hours.
>>>
>>> I have a JavaPairDStream where for each key I am getting the same key
>>> but with different value,which might appear in the same batch or some next
>>> batch.
>>>
>>> When the key appears second time I need to update a field in value of
>>> previous key with a field in the later key. The keys for which the
>>> combination keys do not come should be rejected after 2 hours.
>>>
>>> At the end of each second I need to output the result to external
>>> database.
>>>
>>> For example :
>>>
>>> Suppose valueX is object of MyClass with fields int a, String b
>>> At t=1sec I am getting
>>> key0,value0(0,"prev0")
>>> key1,value1 (1, "prev1")
>>> key2,value2 (2,"prev2")
>>> key2,value3 (3, "next2")
>>>
>>> Output to database after 1 sec
>>> key2, newValue (2,"next2")
>>>
>>> At t=2 sec getting
>>> key3,value4(4,"prev3")
>>> key1,value5(5,"next1")
>>>
>>> Output to database after 2 sec
>>> key1,newValue(1,"next1")
>>>
>>> At t=3 sec
>>> key4,value6(6,"prev4")
>>> key3,value7(7,"next3")
>>> key5,value5(8,"prev5")
>>> key5,value5(9,"next5")
>>> key0,value0(10,"next0")
>>>
>>> Output to database after 3 sec
>>> key0,newValue(0,"next0")
>>> key3,newValue(4,"next3")
>>> key5,newValue(8,"next5")
>>>
>>>
>>> Please suggest how this can be achieved.
>>>
>>>
>>> Thanks a lot !!!!
>>> Abhi
>>>
>>>
>>>
>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Abhishek Anand <ab...@gmail.com>.
Does mapWithState checkpoints the data ?

When my application goes down and is restarted from checkpoint, will
mapWithState need to recompute the previous batches data ?

Also, to use mapWithState I will need to upgrade my application as I am
using version 1.4.0 and mapWithState isnt supported there. Is there any
other work around ?

Cheers!!
Abhi

On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <se...@gmail.com>
wrote:

> Looks like mapWithState could help you?
> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have an use case like follows in my production environment where I am
>> listening from kafka with slideInterval of 1 min and windowLength of 2
>> hours.
>>
>> I have a JavaPairDStream where for each key I am getting the same key but
>> with different value,which might appear in the same batch or some next
>> batch.
>>
>> When the key appears second time I need to update a field in value of
>> previous key with a field in the later key. The keys for which the
>> combination keys do not come should be rejected after 2 hours.
>>
>> At the end of each second I need to output the result to external
>> database.
>>
>> For example :
>>
>> Suppose valueX is object of MyClass with fields int a, String b
>> At t=1sec I am getting
>> key0,value0(0,"prev0")
>> key1,value1 (1, "prev1")
>> key2,value2 (2,"prev2")
>> key2,value3 (3, "next2")
>>
>> Output to database after 1 sec
>> key2, newValue (2,"next2")
>>
>> At t=2 sec getting
>> key3,value4(4,"prev3")
>> key1,value5(5,"next1")
>>
>> Output to database after 2 sec
>> key1,newValue(1,"next1")
>>
>> At t=3 sec
>> key4,value6(6,"prev4")
>> key3,value7(7,"next3")
>> key5,value5(8,"prev5")
>> key5,value5(9,"next5")
>> key0,value0(10,"next0")
>>
>> Output to database after 3 sec
>> key0,newValue(0,"next0")
>> key3,newValue(4,"next3")
>> key5,newValue(8,"next5")
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>> Thanks a lot !!!!
>> Abhi
>>
>>
>>

Re: Stateful Operation on JavaPairDStream Help Needed !!

Posted by Sebastian Piu <se...@gmail.com>.
Looks like mapWithState could help you?
On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <ab...@gmail.com> wrote:

> Hi All,
>
> I have an use case like follows in my production environment where I am
> listening from kafka with slideInterval of 1 min and windowLength of 2
> hours.
>
> I have a JavaPairDStream where for each key I am getting the same key but
> with different value,which might appear in the same batch or some next
> batch.
>
> When the key appears second time I need to update a field in value of
> previous key with a field in the later key. The keys for which the
> combination keys do not come should be rejected after 2 hours.
>
> At the end of each second I need to output the result to external database.
>
> For example :
>
> Suppose valueX is object of MyClass with fields int a, String b
> At t=1sec I am getting
> key0,value0(0,"prev0")
> key1,value1 (1, "prev1")
> key2,value2 (2,"prev2")
> key2,value3 (3, "next2")
>
> Output to database after 1 sec
> key2, newValue (2,"next2")
>
> At t=2 sec getting
> key3,value4(4,"prev3")
> key1,value5(5,"next1")
>
> Output to database after 2 sec
> key1,newValue(1,"next1")
>
> At t=3 sec
> key4,value6(6,"prev4")
> key3,value7(7,"next3")
> key5,value5(8,"prev5")
> key5,value5(9,"next5")
> key0,value0(10,"next0")
>
> Output to database after 3 sec
> key0,newValue(0,"next0")
> key3,newValue(4,"next3")
> key5,newValue(8,"next5")
>
>
> Please suggest how this can be achieved.
>
>
> Thanks a lot !!!!
> Abhi
>
>
>