You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Bin Wang <wb...@gmail.com> on 2015/09/24 07:45:01 UTC

Get only updated RDDs from or after updateStateBykey

I've read the source code and it seems to be impossible, but I'd like to
confirm it.

It is a very useful feature. For example, I need to store the state of
DStream into my database, in order to recovery them from next redeploy. But
I only need to save the updated ones. Save all keys into database is a lot
of waste.

Through the source code, I think it could be add easily: StateDStream can
get prevStateRDD so that it can make a diff. Is there any chance to add
this as an API of StateDStream? If so, I can work on this feature.

If not possible, is there any work around or hack to do this by myself?

Re: Get only updated RDDs from or after updateStateBykey

Posted by Bin Wang <wb...@gmail.com>.
Thanks, it seems good, though a little hack.

And here is another question. updateByKey compute on all the data from the
beginning, but in many situation, we just need to update the coming data.
This could be a big improve on speed and resource. Would this to be support
in the future?

Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午6:01写道:

> You can create connection like this:
>
>     val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
> => {
>       val dbConnection = create a db connection
>       iterator.flatMap { case (key, values, stateOption) =>
>         if (values.isEmpty) {
>           // don't access database
>         } else {
>           // update to new state and save to database
>         }
>         // return new state
>       }
>       TaskContext.get().addTaskCompletionListener(_ => db.disconnect())
>     }
>
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-24 17:42 GMT+08:00 Bin Wang <wb...@gmail.com>:
>
>> It seems like a work around. But I don't know how to get the database
>> connection from the working nodes.
>>
>> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午5:37写道:
>>
>>> Could you write your update func like this?
>>>
>>>     val updateFunc = (iterator: Iterator[(String, Seq[Int],
>>> Option[Int])]) => {
>>>       iterator.flatMap { case (key, values, stateOption) =>
>>>         if (values.isEmpty) {
>>>           // don't access database
>>>         } else {
>>>           // update to new state and save to database
>>>         }
>>>         // return new state
>>>       }
>>>     }
>>>
>>> and use this overload:
>>>
>>> def updateStateByKey[S: ClassTag](
>>>       updateFunc: (Seq[V], Option[S]) => Option[S],
>>>       partitioner: Partitioner
>>>     ): DStream[(K, S)]
>>>
>>> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
>>> doesn't have a doc now...
>>>
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-09-24 17:26 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>>
>>>> Data that are not updated should be saved earlier: while the data added
>>>> to the DStream at the first time, it should be considered as updated. So
>>>> save the same data again is a waste.
>>>>
>>>> What are the community is doing? Is there any doc or discussion that I
>>>> can look for? Thanks.
>>>>
>>>>
>>>>
>>>> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午4:27写道:
>>>>
>>>>> For data that are not updated, where do you save? Or do you only want
>>>>> to avoid accessing database for those that are not updated?
>>>>>
>>>>> Besides,  the community is working on optimizing "updateStateBykey"'s
>>>>> performance. Hope it will be delivered soon.
>>>>>
>>>>> Best Regards,
>>>>> Shixiong Zhu
>>>>>
>>>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>>>>
>>>>>> I've read the source code and it seems to be impossible, but I'd like
>>>>>> to confirm it.
>>>>>>
>>>>>> It is a very useful feature. For example, I need to store the state
>>>>>> of DStream into my database, in order to recovery them from next redeploy.
>>>>>> But I only need to save the updated ones. Save all keys into database is a
>>>>>> lot of waste.
>>>>>>
>>>>>> Through the source code, I think it could be add easily: StateDStream
>>>>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>>>
>>>>>> If not possible, is there any work around or hack to do this by
>>>>>> myself?
>>>>>>
>>>>>
>>>>>
>>>
>

Re: Get only updated RDDs from or after updateStateBykey

Posted by Shixiong Zhu <zs...@gmail.com>.
You can create connection like this:

    val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
=> {
      val dbConnection = create a db connection
      iterator.flatMap { case (key, values, stateOption) =>
        if (values.isEmpty) {
          // don't access database
        } else {
          // update to new state and save to database
        }
        // return new state
      }
      TaskContext.get().addTaskCompletionListener(_ => db.disconnect())
    }


Best Regards,
Shixiong Zhu

2015-09-24 17:42 GMT+08:00 Bin Wang <wb...@gmail.com>:

> It seems like a work around. But I don't know how to get the database
> connection from the working nodes.
>
> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午5:37写道:
>
>> Could you write your update func like this?
>>
>>     val updateFunc = (iterator: Iterator[(String, Seq[Int],
>> Option[Int])]) => {
>>       iterator.flatMap { case (key, values, stateOption) =>
>>         if (values.isEmpty) {
>>           // don't access database
>>         } else {
>>           // update to new state and save to database
>>         }
>>         // return new state
>>       }
>>     }
>>
>> and use this overload:
>>
>> def updateStateByKey[S: ClassTag](
>>       updateFunc: (Seq[V], Option[S]) => Option[S],
>>       partitioner: Partitioner
>>     ): DStream[(K, S)]
>>
>> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
>> doesn't have a doc now...
>>
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-24 17:26 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>
>>> Data that are not updated should be saved earlier: while the data added
>>> to the DStream at the first time, it should be considered as updated. So
>>> save the same data again is a waste.
>>>
>>> What are the community is doing? Is there any doc or discussion that I
>>> can look for? Thanks.
>>>
>>>
>>>
>>> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午4:27写道:
>>>
>>>> For data that are not updated, where do you save? Or do you only want
>>>> to avoid accessing database for those that are not updated?
>>>>
>>>> Besides,  the community is working on optimizing "updateStateBykey"'s
>>>> performance. Hope it will be delivered soon.
>>>>
>>>> Best Regards,
>>>> Shixiong Zhu
>>>>
>>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>>>
>>>>> I've read the source code and it seems to be impossible, but I'd like
>>>>> to confirm it.
>>>>>
>>>>> It is a very useful feature. For example, I need to store the state of
>>>>> DStream into my database, in order to recovery them from next redeploy. But
>>>>> I only need to save the updated ones. Save all keys into database is a lot
>>>>> of waste.
>>>>>
>>>>> Through the source code, I think it could be add easily: StateDStream
>>>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>>
>>>>> If not possible, is there any work around or hack to do this by myself?
>>>>>
>>>>
>>>>
>>

Re: Get only updated RDDs from or after updateStateBykey

Posted by Bin Wang <wb...@gmail.com>.
It seems like a work around. But I don't know how to get the database
connection from the working nodes.

Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午5:37写道:

> Could you write your update func like this?
>
>     val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
> => {
>       iterator.flatMap { case (key, values, stateOption) =>
>         if (values.isEmpty) {
>           // don't access database
>         } else {
>           // update to new state and save to database
>         }
>         // return new state
>       }
>     }
>
> and use this overload:
>
> def updateStateByKey[S: ClassTag](
>       updateFunc: (Seq[V], Option[S]) => Option[S],
>       partitioner: Partitioner
>     ): DStream[(K, S)]
>
> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
> doesn't have a doc now...
>
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-24 17:26 GMT+08:00 Bin Wang <wb...@gmail.com>:
>
>> Data that are not updated should be saved earlier: while the data added
>> to the DStream at the first time, it should be considered as updated. So
>> save the same data again is a waste.
>>
>> What are the community is doing? Is there any doc or discussion that I
>> can look for? Thanks.
>>
>>
>>
>> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午4:27写道:
>>
>>> For data that are not updated, where do you save? Or do you only want to
>>> avoid accessing database for those that are not updated?
>>>
>>> Besides,  the community is working on optimizing "updateStateBykey"'s
>>> performance. Hope it will be delivered soon.
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>>
>>>> I've read the source code and it seems to be impossible, but I'd like
>>>> to confirm it.
>>>>
>>>> It is a very useful feature. For example, I need to store the state of
>>>> DStream into my database, in order to recovery them from next redeploy. But
>>>> I only need to save the updated ones. Save all keys into database is a lot
>>>> of waste.
>>>>
>>>> Through the source code, I think it could be add easily: StateDStream
>>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>
>>>> If not possible, is there any work around or hack to do this by myself?
>>>>
>>>
>>>
>

Re: Get only updated RDDs from or after updateStateBykey

Posted by Shixiong Zhu <zs...@gmail.com>.
Could you write your update func like this?

    val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
=> {
      iterator.flatMap { case (key, values, stateOption) =>
        if (values.isEmpty) {
          // don't access database
        } else {
          // update to new state and save to database
        }
        // return new state
      }
    }

and use this overload:

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner
    ): DStream[(K, S)]

There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
doesn't have a doc now...


Best Regards,
Shixiong Zhu

2015-09-24 17:26 GMT+08:00 Bin Wang <wb...@gmail.com>:

> Data that are not updated should be saved earlier: while the data added to
> the DStream at the first time, it should be considered as updated. So save
> the same data again is a waste.
>
> What are the community is doing? Is there any doc or discussion that I can
> look for? Thanks.
>
>
>
> Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午4:27写道:
>
>> For data that are not updated, where do you save? Or do you only want to
>> avoid accessing database for those that are not updated?
>>
>> Besides,  the community is working on optimizing "updateStateBykey"'s
>> performance. Hope it will be delivered soon.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:
>>
>>> I've read the source code and it seems to be impossible, but I'd like to
>>> confirm it.
>>>
>>> It is a very useful feature. For example, I need to store the state of
>>> DStream into my database, in order to recovery them from next redeploy. But
>>> I only need to save the updated ones. Save all keys into database is a lot
>>> of waste.
>>>
>>> Through the source code, I think it could be add easily: StateDStream
>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>> this as an API of StateDStream? If so, I can work on this feature.
>>>
>>> If not possible, is there any work around or hack to do this by myself?
>>>
>>
>>

Re: Get only updated RDDs from or after updateStateBykey

Posted by Bin Wang <wb...@gmail.com>.
Data that are not updated should be saved earlier: while the data added to
the DStream at the first time, it should be considered as updated. So save
the same data again is a waste.

What are the community is doing? Is there any doc or discussion that I can
look for? Thanks.



Shixiong Zhu <zs...@gmail.com>于2015年9月24日周四 下午4:27写道:

> For data that are not updated, where do you save? Or do you only want to
> avoid accessing database for those that are not updated?
>
> Besides,  the community is working on optimizing "updateStateBykey"'s
> performance. Hope it will be delivered soon.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:
>
>> I've read the source code and it seems to be impossible, but I'd like to
>> confirm it.
>>
>> It is a very useful feature. For example, I need to store the state of
>> DStream into my database, in order to recovery them from next redeploy. But
>> I only need to save the updated ones. Save all keys into database is a lot
>> of waste.
>>
>> Through the source code, I think it could be add easily: StateDStream can
>> get prevStateRDD so that it can make a diff. Is there any chance to add
>> this as an API of StateDStream? If so, I can work on this feature.
>>
>> If not possible, is there any work around or hack to do this by myself?
>>
>
>

Re: Get only updated RDDs from or after updateStateBykey

Posted by Shixiong Zhu <zs...@gmail.com>.
For data that are not updated, where do you save? Or do you only want to
avoid accessing database for those that are not updated?

Besides,  the community is working on optimizing "updateStateBykey"'s
performance. Hope it will be delivered soon.

Best Regards,
Shixiong Zhu

2015-09-24 13:45 GMT+08:00 Bin Wang <wb...@gmail.com>:

> I've read the source code and it seems to be impossible, but I'd like to
> confirm it.
>
> It is a very useful feature. For example, I need to store the state of
> DStream into my database, in order to recovery them from next redeploy. But
> I only need to save the updated ones. Save all keys into database is a lot
> of waste.
>
> Through the source code, I think it could be add easily: StateDStream can
> get prevStateRDD so that it can make a diff. Is there any chance to add
> this as an API of StateDStream? If so, I can work on this feature.
>
> If not possible, is there any work around or hack to do this by myself?
>