You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Uthayan Suthakar <ut...@gmail.com> on 2015/10/22 17:06:49 UTC

[Spark Streaming] How do we reset the updateStateByKey values.

Hello guys,

I have a stream job that will carryout computations and update the state
(SUM the value). At some point, I would like to reset the state. I could
drop the state by setting 'None' but I don't want to drop it. I would like
to keep the state but update the state.


For example:

JavaPairDStream<String, String> updatedResultsState =
streamLogs.updateStateByKey(updateResultsStream);

At some condition, I would like to update the state by key but with the
different values, hence different update function.


e.g.

 updatedResultsState = newData.updateStateByKey(resetResultsStream);

But the  newData.updateStateByKeyvalues cannot be replaced with the value
in streamLogs.updateStateByKey. Do you know how I could replace the state
value in  streamLogs with newData.

Is this possible?

Re: [Spark Streaming] How do we reset the updateStateByKey values.

Posted by Uthayan Suthakar <ut...@gmail.com>.
Thank you Adrian for your reply. I've already managed to resolve this issue
and coincidently it is similar to the solution that you've proposed.

Cheers,

Uthay.

On 26 October 2015 at 10:41, Adrian Tanase <at...@adobe.com> wrote:

> Have you considered union-ing the 2 streams? Basically you can consider
> them as 2 “message types” that your update function can consume (e.g.
> implement a common interface):
>
>    - regularUpdate
>    - resetStateUpdate
>
> Inside your updateStateByKey you can check if any of the messages in the
> list of updates is an resetState message. If now, continue summing the
> others.
>
> I can provide scala samples, my java is beyond rusty :)
>
> -adrian
>
> From: Uthayan Suthakar
> Date: Friday, October 23, 2015 at 2:10 PM
> To: Sander van Dijk
> Cc: user
> Subject: Re: [Spark Streaming] How do we reset the updateStateByKey
> values.
>
> Hi Sander,
>
> Thank you for your very informative email. From your email, I've learned a
> quite a bit.
>
> >>>Is the condition determined somehow from the data coming through
> streamLogs, and is newData streamLogs again (rather than a whole data
> source?)
>
> No, they are two different Streams. I have two stream receivers, one of
> which sends event regularly and the other is not so regular (this data is
> computed by another application and stored into HDFS). What I'm trying to
> do is pick up the data from HDFS and overwrite the Stream's state. Hence
> the overwriting should only take place if there were new files in HDFS.
>
> So we have two different RDDs. If no file is found in HDFS, it will simply
> read the regular stream, compute and update the state(1) and output the
> result. If there is a file found in HDFS, then it should overwrite the
> state (1) with the data found from HDFS so the new events from the regular
> stream will carry on with the new overwritten state.
>
> I managed to get most of it done, but only having the issue with
> overwriting the state.
>
>
>
> On 22 October 2015 at 19:35, Sander van Dijk <sg...@gmail.com> wrote:
>
>> I don't think it is possible in the way you try to do it. It is important
>> to remember that the statements you mention only set up the stream stages,
>> before the stream is actually running. Once it's running, you cannot
>> change, remove or add stages.
>>
>> I am not sure how you determine your condition and what the actual change
>> should be when that condition is met: you say you want a different update
>> function but then give a statement with the same update function but a
>> different source stream). Is the condition determined somehow from the data
>> coming through streamLogs, and is newData basically streamLogs again
>> (rather than a whole data source?). In that case I can think of 3 things to
>> try:
>>
>> - if the condition you switch on can be determined independently from
>> every item in streamLogs, you can simply do an if/else inside
>> updateResultsStream to change the method that you determine your state
>> - if this is not the case, but you can determine when to switch your
>> condition for each key independently, you can extend your state type to
>> also keep track of your condition: rather than using
>> JavaPairDStream<String, String> you make updatedResultsState a
>> JavaPairDStream<String, Pair<String, Boolean>> (assuming you have some
>> class Pair), and you make updateResultsStream update and check the state of
>> the boolean.
>> - finally, you can have a separate state stream that keeps track of your
>> condition globally, then join that with you main stream and use that to
>> update state. Something like:
>>
>> // determineCondition should result in a reduction to a single item that
>> signals whether the condition is met in the current batch,
>> updateContitionState should remember that
>> conditionStateStream =
>> streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)
>>
>>
>> // addCondition gets RDDs from streamLogs and  single-item RDDs with the
>> condition state and should add that state to each item in the streamLogs RDD
>> joinedStream = streamLogs.transformWith(conditionStateStream,
>> addCondition)
>>
>> // This is similar to the extend state type of the previous idea, but now
>> your condition state is determined globally rather than per log entry
>> updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)
>>
>> I hope this applies to your case and that it makes sense, my Java is a
>> bit rusty :) and perhaps others can suggest better spark streaming methods
>> that can be used, but hopefully the idea is clear.
>>
>> Sander
>>
>> On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <
>> uthayan.suthakar@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> I have a stream job that will carryout computations and update the state
>>> (SUM the value). At some point, I would like to reset the state. I could
>>> drop the state by setting 'None' but I don't want to drop it. I would like
>>> to keep the state but update the state.
>>>
>>>
>>> For example:
>>>
>>> JavaPairDStream<String, String> updatedResultsState =
>>> streamLogs.updateStateByKey(updateResultsStream);
>>>
>>> At some condition, I would like to update the state by key but with the
>>> different values, hence different update function.
>>>
>>>
>>> e.g.
>>>
>>>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>>>
>>> But the  newData.updateStateByKeyvalues cannot be replaced with the
>>> value in streamLogs.updateStateByKey. Do you know how I could replace the
>>> state value in  streamLogs with newData.
>>>
>>> Is this possible?
>>>
>>>
>>>
>>>
>>>
>>>
>

Re: [Spark Streaming] How do we reset the updateStateByKey values.

Posted by Adrian Tanase <at...@adobe.com>.
Have you considered union-ing the 2 streams? Basically you can consider them as 2 “message types” that your update function can consume (e.g. implement a common interface):

  *   regularUpdate
  *   resetStateUpdate

Inside your updateStateByKey you can check if any of the messages in the list of updates is an resetState message. If now, continue summing the others.

I can provide scala samples, my java is beyond rusty :)

-adrian

From: Uthayan Suthakar
Date: Friday, October 23, 2015 at 2:10 PM
To: Sander van Dijk
Cc: user
Subject: Re: [Spark Streaming] How do we reset the updateStateByKey values.

Hi Sander,

Thank you for your very informative email. From your email, I've learned a quite a bit.

>>>Is the condition determined somehow from the data coming through streamLogs, and is newData streamLogs again (rather than a whole data source?)

No, they are two different Streams. I have two stream receivers, one of which sends event regularly and the other is not so regular (this data is computed by another application and stored into HDFS). What I'm trying to do is pick up the data from HDFS and overwrite the Stream's state. Hence the overwriting should only take place if there were new files in HDFS.

So we have two different RDDs. If no file is found in HDFS, it will simply read the regular stream, compute and update the state(1) and output the result. If there is a file found in HDFS, then it should overwrite the state (1) with the data found from HDFS so the new events from the regular stream will carry on with the new overwritten state.

I managed to get most of it done, but only having the issue with overwriting the state.



On 22 October 2015 at 19:35, Sander van Dijk <sg...@gmail.com>> wrote:
I don't think it is possible in the way you try to do it. It is important to remember that the statements you mention only set up the stream stages, before the stream is actually running. Once it's running, you cannot change, remove or add stages.

I am not sure how you determine your condition and what the actual change should be when that condition is met: you say you want a different update function but then give a statement with the same update function but a different source stream). Is the condition determined somehow from the data coming through streamLogs, and is newData basically streamLogs again (rather than a whole data source?). In that case I can think of 3 things to try:

- if the condition you switch on can be determined independently from every item in streamLogs, you can simply do an if/else inside updateResultsStream to change the method that you determine your state
- if this is not the case, but you can determine when to switch your condition for each key independently, you can extend your state type to also keep track of your condition: rather than using JavaPairDStream<String, String> you make updatedResultsState a JavaPairDStream<String, Pair<String, Boolean>> (assuming you have some class Pair), and you make updateResultsStream update and check the state of the boolean.
- finally, you can have a separate state stream that keeps track of your condition globally, then join that with you main stream and use that to update state. Something like:

// determineCondition should result in a reduction to a single item that signals whether the condition is met in the current batch, updateContitionState should remember that
conditionStateStream = streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)

// addCondition gets RDDs from streamLogs and  single-item RDDs with the condition state and should add that state to each item in the streamLogs RDD
joinedStream = streamLogs.transformWith(conditionStateStream, addCondition)

// This is similar to the extend state type of the previous idea, but now your condition state is determined globally rather than per log entry
updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)

I hope this applies to your case and that it makes sense, my Java is a bit rusty :) and perhaps others can suggest better spark streaming methods that can be used, but hopefully the idea is clear.

Sander

On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <ut...@gmail.com>> wrote:
Hello guys,

I have a stream job that will carryout computations and update the state (SUM the value). At some point, I would like to reset the state. I could drop the state by setting 'None' but I don't want to drop it. I would like to keep the state but update the state.


For example:

JavaPairDStream<String, String> updatedResultsState = streamLogs.updateStateByKey(updateResultsStream);

At some condition, I would like to update the state by key but with the different values, hence different update function.


e.g.

 updatedResultsState = newData.updateStateByKey(resetResultsStream);

But the  newData.updateStateByKeyvalues cannot be replaced with the value in streamLogs.updateStateByKey. Do you know how I could replace the state value in  streamLogs with newData.

Is this possible?







Re: [Spark Streaming] How do we reset the updateStateByKey values.

Posted by Uthayan Suthakar <ut...@gmail.com>.
Hi Sander,

Thank you for your very informative email. From your email, I've learned a
quite a bit.

>>>Is the condition determined somehow from the data coming through
streamLogs, and is newData streamLogs again (rather than a whole data
source?)

No, they are two different Streams. I have two stream receivers, one of
which sends event regularly and the other is not so regular (this data is
computed by another application and stored into HDFS). What I'm trying to
do is pick up the data from HDFS and overwrite the Stream's state. Hence
the overwriting should only take place if there were new files in HDFS.

So we have two different RDDs. If no file is found in HDFS, it will simply
read the regular stream, compute and update the state(1) and output the
result. If there is a file found in HDFS, then it should overwrite the
state (1) with the data found from HDFS so the new events from the regular
stream will carry on with the new overwritten state.

I managed to get most of it done, but only having the issue with
overwriting the state.



On 22 October 2015 at 19:35, Sander van Dijk <sg...@gmail.com> wrote:

> I don't think it is possible in the way you try to do it. It is important
> to remember that the statements you mention only set up the stream stages,
> before the stream is actually running. Once it's running, you cannot
> change, remove or add stages.
>
> I am not sure how you determine your condition and what the actual change
> should be when that condition is met: you say you want a different update
> function but then give a statement with the same update function but a
> different source stream). Is the condition determined somehow from the data
> coming through streamLogs, and is newData basically streamLogs again
> (rather than a whole data source?). In that case I can think of 3 things to
> try:
>
> - if the condition you switch on can be determined independently from
> every item in streamLogs, you can simply do an if/else inside
> updateResultsStream to change the method that you determine your state
> - if this is not the case, but you can determine when to switch your
> condition for each key independently, you can extend your state type to
> also keep track of your condition: rather than using
> JavaPairDStream<String, String> you make updatedResultsState a
> JavaPairDStream<String, Pair<String, Boolean>> (assuming you have some
> class Pair), and you make updateResultsStream update and check the state of
> the boolean.
> - finally, you can have a separate state stream that keeps track of your
> condition globally, then join that with you main stream and use that to
> update state. Something like:
>
> // determineCondition should result in a reduction to a single item that
> signals whether the condition is met in the current batch,
> updateContitionState should remember that
> conditionStateStream =
> streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)
>
>
> // addCondition gets RDDs from streamLogs and  single-item RDDs with the
> condition state and should add that state to each item in the streamLogs RDD
> joinedStream = streamLogs.transformWith(conditionStateStream,
> addCondition)
>
> // This is similar to the extend state type of the previous idea, but now
> your condition state is determined globally rather than per log entry
> updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)
>
> I hope this applies to your case and that it makes sense, my Java is a bit
> rusty :) and perhaps others can suggest better spark streaming methods that
> can be used, but hopefully the idea is clear.
>
> Sander
>
> On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <
> uthayan.suthakar@gmail.com> wrote:
>
>> Hello guys,
>>
>> I have a stream job that will carryout computations and update the state
>> (SUM the value). At some point, I would like to reset the state. I could
>> drop the state by setting 'None' but I don't want to drop it. I would like
>> to keep the state but update the state.
>>
>>
>> For example:
>>
>> JavaPairDStream<String, String> updatedResultsState =
>> streamLogs.updateStateByKey(updateResultsStream);
>>
>> At some condition, I would like to update the state by key but with the
>> different values, hence different update function.
>>
>>
>> e.g.
>>
>>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>>
>> But the  newData.updateStateByKeyvalues cannot be replaced with the value
>> in streamLogs.updateStateByKey. Do you know how I could replace the state
>> value in  streamLogs with newData.
>>
>> Is this possible?
>>
>>
>>
>>
>>
>>

Re: [Spark Streaming] How do we reset the updateStateByKey values.

Posted by Sander van Dijk <sg...@gmail.com>.
I don't think it is possible in the way you try to do it. It is important
to remember that the statements you mention only set up the stream stages,
before the stream is actually running. Once it's running, you cannot
change, remove or add stages.

I am not sure how you determine your condition and what the actual change
should be when that condition is met: you say you want a different update
function but then give a statement with the same update function but a
different source stream). Is the condition determined somehow from the data
coming through streamLogs, and is newData basically streamLogs again
(rather than a whole data source?). In that case I can think of 3 things to
try:

- if the condition you switch on can be determined independently from every
item in streamLogs, you can simply do an if/else inside updateResultsStream
to change the method that you determine your state
- if this is not the case, but you can determine when to switch your
condition for each key independently, you can extend your state type to
also keep track of your condition: rather than using
JavaPairDStream<String, String> you make updatedResultsState a
JavaPairDStream<String, Pair<String, Boolean>> (assuming you have some
class Pair), and you make updateResultsStream update and check the state of
the boolean.
- finally, you can have a separate state stream that keeps track of your
condition globally, then join that with you main stream and use that to
update state. Something like:

// determineCondition should result in a reduction to a single item that
signals whether the condition is met in the current batch,
updateContitionState should remember that
conditionStateStream =
streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)


// addCondition gets RDDs from streamLogs and  single-item RDDs with the
condition state and should add that state to each item in the streamLogs RDD
joinedStream = streamLogs.transformWith(conditionStateStream, addCondition)

// This is similar to the extend state type of the previous idea, but now
your condition state is determined globally rather than per log entry
updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)

I hope this applies to your case and that it makes sense, my Java is a bit
rusty :) and perhaps others can suggest better spark streaming methods that
can be used, but hopefully the idea is clear.

Sander

On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <ut...@gmail.com>
wrote:

> Hello guys,
>
> I have a stream job that will carryout computations and update the state
> (SUM the value). At some point, I would like to reset the state. I could
> drop the state by setting 'None' but I don't want to drop it. I would like
> to keep the state but update the state.
>
>
> For example:
>
> JavaPairDStream<String, String> updatedResultsState =
> streamLogs.updateStateByKey(updateResultsStream);
>
> At some condition, I would like to update the state by key but with the
> different values, hence different update function.
>
>
> e.g.
>
>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>
> But the  newData.updateStateByKeyvalues cannot be replaced with the value
> in streamLogs.updateStateByKey. Do you know how I could replace the state
> value in  streamLogs with newData.
>
> Is this possible?
>
>
>
>
>
>

Re: [Spark Streaming] How do we reset the updateStateByKey values.

Posted by Uthayan Suthakar <ut...@gmail.com>.
I need to take the value from a RDD and update the state of the other RDD.
Is this possible?

On 22 October 2015 at 16:06, Uthayan Suthakar <ut...@gmail.com>
wrote:

> Hello guys,
>
> I have a stream job that will carryout computations and update the state
> (SUM the value). At some point, I would like to reset the state. I could
> drop the state by setting 'None' but I don't want to drop it. I would like
> to keep the state but update the state.
>
>
> For example:
>
> JavaPairDStream<String, String> updatedResultsState =
> streamLogs.updateStateByKey(updateResultsStream);
>
> At some condition, I would like to update the state by key but with the
> different values, hence different update function.
>
>
> e.g.
>
>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>
> But the  newData.updateStateByKeyvalues cannot be replaced with the value
> in streamLogs.updateStateByKey. Do you know how I could replace the state
> value in  streamLogs with newData.
>
> Is this possible?
>
>
>
>
>
>