You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arpith P <ar...@gmail.com> on 2020/10/13 11:26:39 UTC

Is MapState tied to Operator Input & Output type?

Hi,


I’ve a *ProcessFunction* which initially was receiving input & output type
of String (1) & inside *processElement* I was updating MapState. Now I have
changed the Input & Output type to be Map, String (2), but if I restore
from the last checkpoint folder MapState is coming in as empty. I’ve
checked that checkpoint folder actually saves data (i.e. Files size > 1GB).
Does map state tied with ProcessFunction input & output type, if not why
doesn't mapstate get restored.



(1)

       public class TestProcess extends ProcessFunction<String,
String> implements
CheckpointedFunction

(2)

    public class TestProcess extends ProcessFunction<Map<String,
String>, String> implements CheckpointedFunction

Re: Is MapState tied to Operator Input & Output type?

Posted by Yun Tang <my...@live.com>.
Hi Arpith

I'm afraid that you're totally talking about the wrong thing in previous thread. The root cause is not restoring state from checkpoint but not access the state legally.
Have you ever add keyBy before process your function as doc's note [1] said: "If you want to access keyed state and timers you have to apply the ProcessFunction on a keyed stream"?


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-processfunction

Best
Yun Tang
________________________________
From: Arpith P <ar...@gmail.com>
Sent: Tuesday, October 13, 2020 22:20
To: Yun Tang <my...@live.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Is MapState tied to Operator Input & Output type?

Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well but it didn't help me. What I'm trying to do is I have two stream A & B which I want to connect/process in C; I eventually want values from stream A to be saved in C's MapState. What I've tried is I used ConnectedStream to connect both A(keyedstream) & B and processed in CoProcessFunction C, but looks like in CoprocessFunction doesn't have MapState as I'm getting functionInitializationContext.getKeyedStore as null. Is it possible to access MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang <my...@live.com>> wrote:
Hi

The type of map state is not directly related with input & output type, this is only related with how you define the state descriptor.

  *   Have you ever changed the state descriptor after changing the type of input/output type?
  *   Have you assigned the id [1] to the operator which using the 'TestProcess'? The state might not be restored if you change your code without id assigned.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids

Best
Yun Tang

________________________________
From: Arpith P <ar...@gmail.com>>
Sent: Tuesday, October 13, 2020 19:26
To: user <us...@flink.apache.org>>
Subject: Is MapState tied to Operator Input & Output type?


Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.



(1)

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction

Re: Is MapState tied to Operator Input & Output type?

Posted by Arpith P <ar...@gmail.com>.
Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well
but it didn't help me. What I'm trying to do is I have two stream A & B
which I want to connect/process in C; I eventually want values from stream
A to be saved in C's MapState. What I've tried is I used ConnectedStream to
connect both A(keyedstream) & B and processed in CoProcessFunction C, but
looks like in CoprocessFunction doesn't have MapState as I'm getting
functionInitializationContext.getKeyedStore as null. Is it possible to
access MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang <my...@live.com> wrote:

> Hi
>
> The type of map state is not directly related with input & output type,
> this is only related with how you define the state descriptor.
>
>    - Have you ever changed the state descriptor after changing the type
>    of input/output type?
>    - Have you assigned the id [1] to the operator which using the
>    'TestProcess'? The state might not be restored if you change your code
>    without id assigned.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Arpith P <ar...@gmail.com>
> *Sent:* Tuesday, October 13, 2020 19:26
> *To:* user <us...@flink.apache.org>
> *Subject:* Is MapState tied to Operator Input & Output type?
>
>
> Hi,
>
>
> I’ve a *ProcessFunction* which initially was receiving input & output
> type of String (1) & inside *processElement* I was updating MapState. Now
> I have changed the Input & Output type to be Map, String (2), but if I
> restore from the last checkpoint folder MapState is coming in as empty.
> I’ve checked that checkpoint folder actually saves data (i.e. Files size >
> 1GB). Does map state tied with ProcessFunction input & output type, if not
> why doesn't mapstate get restored.
>
>
>
> (1)
>
>        public class TestProcess extends ProcessFunction<String, String> implements
> CheckpointedFunction
>
> (2)
>
>     public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction
>
>

Re: Is MapState tied to Operator Input & Output type?

Posted by Yun Tang <my...@live.com>.
Hi

The type of map state is not directly related with input & output type, this is only related with how you define the state descriptor.

  *   Have you ever changed the state descriptor after changing the type of input/output type?
  *   Have you assigned the id [1] to the operator which using the 'TestProcess'? The state might not be restored if you change your code without id assigned.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids

Best
Yun Tang

________________________________
From: Arpith P <ar...@gmail.com>
Sent: Tuesday, October 13, 2020 19:26
To: user <us...@flink.apache.org>
Subject: Is MapState tied to Operator Input & Output type?


Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.



(1)

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction