You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nirmalya Sengupta <se...@gmail.com> on 2016/05/13 03:28:00 UTC

Confusion about multiple use of one ValueState

Hello all,

Let's say I want to hold some state value derived during one
transformation, and then use that same state value in a subsequent
transformation? For example:

myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())
....

Now, I define MyStatefulMapper in the usual fashion:

public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>> {

    /**     * The ValueState handle. The first field is the count, the
second field a running sum.     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input,
Collector<Tuple2<Long, Long>> out) throws Exception {

       // logic of accessing and updating the ValueState 'sum' above
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "mySum", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long,
Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the
state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }}


So, by now, RuntimeContext has registered a State holder named 'mySum'.

In the implementation of 'MySubsequentMapper', I need to access this State
holder named 'mySum', perhaps thus (my thinking, I may be wrong):

public class MySubsequentMapper extends
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**     * The ValueState handle. The first field is the count, the
second field a running sum.     */
    private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;

    private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier


    @Override
    public void flatMap(Tuple2<Long, Long> input,
Collector<Tuple2<Long, Long>> out) throws Exception {

       // logic of accessing and updating the ValueState 'aSubsequentSum' above

       // but this logic depends on the current contents of ValueState
'sum' created earlier
    }

    @Override
    public void open(Configuration config) {
        // Logic to create ValueDescriptor for 'aSubsequentSum' which
is owned by this operator

        // ...

        // Question: now, how do I prepare for accessing 'sum' which
is a State holder, but created inside an earlier operator?
        sum = getRuntimeContext().getState(descriptor) // how can I
pass the name 'mySum' (used in StateDescriptor)?
    }}

I have two questions:

1) What I am trying to achieve: is that possible and even, advisable? If
not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
always before MySubsequentOperator.open() because of the lexical order of
appearance in the source code?

-- Nirmalya




-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Re: Confusion about multiple use of one ValueState

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Even thought there are multiple instance of map object transient value
object state is accessible across the object, so as the stream is flowing
in the value can be updated based on application logic.

On Fri, May 13, 2016 at 11:26 AM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I don't think the valuestate defined in one map function is accessible in
> other map function this is my understanding, also you need to be aware
> there will be instance of map function created for each of your tuple in
> your stream, I had a similar use case where I had to pass in some state
> from one map function to another, I used redis for that.
>
> On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <
> sengupta.nirmalya@gmail.com> wrote:
>
>> Hello all,
>>
>> Let's say I want to hold some state value derived during one
>> transformation, and then use that same state value in a subsequent
>> transformation? For example:
>>
>> myStream
>> .keyBy(fieldID) // Some field ID, may be 0
>> .map(new MyStatefulMapper())
>> .map(new MySubsequentMapper())
>> ....
>>
>> Now, I define MyStatefulMapper in the usual fashion:
>>
>> public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>>
>>     /**     * The ValueState handle. The first field is the count, the second field a running sum.     */
>>     private transient ValueState<Tuple2<Long, Long>> sum;
>>
>>     @Override
>>     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
>>
>>        // logic of accessing and updating the ValueState 'sum' above
>>     }
>>
>>     @Override
>>     public void open(Configuration config) {
>>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>                 new ValueStateDescriptor<>(
>>                         "mySum", // the state name
>>                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
>>                         Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
>>         sum = getRuntimeContext().getState(descriptor);
>>     }}
>>
>>
>> So, by now, RuntimeContext has registered a State holder named 'mySum'.
>>
>> In the implementation of 'MySubsequentMapper', I need to access this
>> State holder named 'mySum', perhaps thus (my thinking, I may be wrong):
>>
>> public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>>
>>     /**     * The ValueState handle. The first field is the count, the second field a running sum.     */
>>     private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
>>
>>     private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
>>
>>
>>     @Override
>>     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
>>
>>        // logic of accessing and updating the ValueState 'aSubsequentSum' above
>>
>>        // but this logic depends on the current contents of ValueState 'sum' created earlier
>>     }
>>
>>     @Override
>>     public void open(Configuration config) {
>>         // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator
>>
>>         // ...
>>
>>         // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator?
>>         sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)?
>>     }}
>>
>> I have two questions:
>>
>> 1) What I am trying to achieve: is that possible and even, advisable? If
>> not, then what is the alternative?
>> 2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
>> always before MySubsequentOperator.open() because of the lexical order of
>> appearance in the source code?
>>
>> -- Nirmalya
>>
>>
>>
>>
>> --
>> Software Technologist
>> http://www.linkedin.com/in/nirmalyasengupta
>> "If you have built castles in the air, your work need not be lost. That
>> is where they should be.
>> Now put the foundation under them."
>>
>
>

Re: Confusion about multiple use of one ValueState

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
I don't think the valuestate defined in one map function is accessible in
other map function this is my understanding, also you need to be aware
there will be instance of map function created for each of your tuple in
your stream, I had a similar use case where I had to pass in some state
from one map function to another, I used redis for that.

On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <
sengupta.nirmalya@gmail.com> wrote:

> Hello all,
>
> Let's say I want to hold some state value derived during one
> transformation, and then use that same state value in a subsequent
> transformation? For example:
>
> myStream
> .keyBy(fieldID) // Some field ID, may be 0
> .map(new MyStatefulMapper())
> .map(new MySubsequentMapper())
> ....
>
> Now, I define MyStatefulMapper in the usual fashion:
>
> public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>
>     /**     * The ValueState handle. The first field is the count, the second field a running sum.     */
>     private transient ValueState<Tuple2<Long, Long>> sum;
>
>     @Override
>     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
>
>        // logic of accessing and updating the ValueState 'sum' above
>     }
>
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>                 new ValueStateDescriptor<>(
>                         "mySum", // the state name
>                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
>                         Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
>         sum = getRuntimeContext().getState(descriptor);
>     }}
>
>
> So, by now, RuntimeContext has registered a State holder named 'mySum'.
>
> In the implementation of 'MySubsequentMapper', I need to access this State
> holder named 'mySum', perhaps thus (my thinking, I may be wrong):
>
> public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>
>     /**     * The ValueState handle. The first field is the count, the second field a running sum.     */
>     private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
>
>     private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
>
>
>     @Override
>     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
>
>        // logic of accessing and updating the ValueState 'aSubsequentSum' above
>
>        // but this logic depends on the current contents of ValueState 'sum' created earlier
>     }
>
>     @Override
>     public void open(Configuration config) {
>         // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator
>
>         // ...
>
>         // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator?
>         sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)?
>     }}
>
> I have two questions:
>
> 1) What I am trying to achieve: is that possible and even, advisable? If
> not, then what is the alternative?
> 2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
> always before MySubsequentOperator.open() because of the lexical order of
> appearance in the source code?
>
> -- Nirmalya
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>

Re: Confusion about multiple use of one ValueState

Posted by nsengupta <se...@gmail.com>.
Sorry, Balaji! Somehow, I missed this particular post of yours. Please ignore
my last mail, where I am asking the same question.

--  Nirmalya



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Confusion-about-multiple-use-of-one-ValueState-tp6876p6891.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.