You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ahmad Hassan <ah...@gmail.com> on 2018/10/19 15:32:11 UTC

Initializing mapstate hangs

Hi,

Initializing mapstate hangs in window function. However if i use valuestate
then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload,
Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor =
new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
* MapState<String, String> state =
this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
here*
System.out.println("## open window state " + state);
}
}

Thanks for the help.

Re: Initializing mapstate hangs

Posted by Ahmad Hassan <ah...@gmail.com>.
Thank you. That worked.

Regards,

On Tue, 23 Oct 2018 at 09:08, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Ahmad,
>
> I think Alexander is right. You've declared the state descriptor
> transient, which effectively makes it null at the worker node, when the
> state access is happening. Remove the transient modifier or instantiate the
> descriptor in the open method. The common pattern is to have the state
> itself as a transient field rather than the descriptor.
>
> Best,
>
> Dawid
> On 22/10/2018 15:15, Alexander Smirnov wrote:
>
> I think that's because you declared it as transient field.
>
> Move the declaration inside of "open" function to resolve that
>
> On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <ah...@gmail.com>
> wrote:
>
>> 2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Window(SlidingProcessingTimeWindows(180000, 180000),
>> TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1)
>> (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.
>>
>> java.lang.NullPointerException: The state properties must not be null
>>
>> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)
>>
>> at
>> com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>
>> at
>> org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> On Sat, 20 Oct 2018 at 11:29, vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Ahmad,
>>>
>>> Can you try to dump thread info from the Task Manager's JVM instance?
>>>
>>> Thanks, vino.
>>>
>>> Ahmad Hassan <ah...@gmail.com> 于2018年10月20日周六 下午4:24写道:
>>>
>>>> Flink 1.6.0. Valuestate initialises successful but mapstate hangs
>>>>
>>>> Regards
>>>>
>>>> On 20 Oct 2018, at 02:55, vino yang <ya...@gmail.com> wrote:
>>>>
>>>> Hi Ahmad,
>>>>
>>>> Which version of Flink do you use?
>>>>
>>>> Thanks, vino.
>>>>
>>>> Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Initializing mapstate hangs in window function. However if i use
>>>>> valuestate then it is initialized succcessfully. I am using rocksdb to
>>>>> store the state.
>>>>>
>>>>> public class MyWindowFunction extends RichWindowFunction<Event,
>>>>> Payload, Tuple, TimeWindow>
>>>>> {
>>>>> private transient MapStateDescriptor<String, String>
>>>>> productsDescriptor = new MapStateDescriptor<>(
>>>>> "mapState", String.class, String.class);
>>>>>
>>>>> @Override
>>>>> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
>>>>> input,
>>>>> final Collector<Payload> out)
>>>>> {
>>>>> // do something
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void open(Configuration parameters) throws Exception
>>>>> {
>>>>> System.out.println("## open init window state ");
>>>>> * MapState<String, String> state =
>>>>> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
>>>>> here*
>>>>> System.out.println("## open window state " + state);
>>>>> }
>>>>> }
>>>>>
>>>>> Thanks for the help.
>>>>>
>>>>

Re: Initializing mapstate hangs

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Ahmad,

I think Alexander is right. You've declared the state descriptor
transient, which effectively makes it null at the worker node, when the
state access is happening. Remove the transient modifier or instantiate
the descriptor in the open method. The common pattern is to have the
state itself as a transient field rather than the descriptor.

Best,

Dawid

On 22/10/2018 15:15, Alexander Smirnov wrote:
> I think that's because you declared it as transient field.
>
> Move the declaration inside of "open" function to resolve that
>
> On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <ahmad.hassan@gmail.com
> <ma...@gmail.com>> wrote:
>
>     2018-10-22 13:46:31,944 INFO 
>     org.apache.flink.runtime.taskmanager.Task                     -
>     Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger,
>     MetricWindowFunction) -> Map -> Sink: Unnamed (1/1)
>     (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.
>
>     java.lang.NullPointerException: The state properties must not be null
>
>     at
>     org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)
>
>     at
>     com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)
>
>     at
>     org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>     at
>     org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)
>
>     at
>     org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>     at
>     org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>     at
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>     On Sat, 20 Oct 2018 at 11:29, vino yang <yanghua1127@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Hi Ahmad,
>
>         Can you try to dump thread info from the Task Manager's JVM
>         instance?
>
>         Thanks, vino.
>
>         Ahmad Hassan <ahmad.hassan@gmail.com
>         <ma...@gmail.com>> 于2018年10月20日周六
>         下午4:24写道:
>
>             Flink 1.6.0. Valuestate initialises successful but
>             mapstate hangs 
>
>             Regards 
>
>             On 20 Oct 2018, at 02:55, vino yang <yanghua1127@gmail.com
>             <ma...@gmail.com>> wrote:
>
>>             Hi Ahmad,
>>
>>             Which version of Flink do you use?
>>
>>             Thanks, vino.
>>
>>             Ahmad Hassan <ahmad.hassan@gmail.com
>>             <ma...@gmail.com>> 于2018年10月19日周五
>>             下午11:32写道:
>>
>>                 Hi,
>>
>>                 Initializing mapstate hangs in window function.
>>                 However if i use valuestate then it is initialized
>>                 succcessfully. I am using rocksdb to store the state.
>>
>>                 public class MyWindowFunction extends
>>                 RichWindowFunction<Event, Payload, Tuple, TimeWindow>
>>                 {
>>                 private transient MapStateDescriptor<String, String>
>>                 productsDescriptor = new MapStateDescriptor<>(
>>                 "mapState", String.class, String.class);
>>
>>                 @Override
>>                 public void apply(Tuple key, TimeWindow window, final
>>                 Iterable<Event> input,
>>                 final Collector<Payload> out)
>>                 {
>>                 // do something
>>                 }
>>
>>                 @Override
>>                 public void open(Configuration parameters) throws
>>                 Exception
>>                 {
>>                 System.out.println("## open init window state ");
>>                 *MapState<String, String> state =
>>                 this.getRuntimeContext().getMapState(productsDescriptor);
>>                 <<< program hangs here*
>>                 System.out.println("## open window state " + state);
>>                 }
>>                 }
>>
>>                 Thanks for the help.
>>

Re: Initializing mapstate hangs

Posted by Alexander Smirnov <al...@gmail.com>.
I think that's because you declared it as transient field.

Move the declaration inside of "open" function to resolve that

On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <ah...@gmail.com> wrote:

> 2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Window(SlidingProcessingTimeWindows(180000, 180000),
> TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1)
> (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.
>
> java.lang.NullPointerException: The state properties must not be null
>
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
>
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)
>
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)
>
> at
> com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at
> org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> On Sat, 20 Oct 2018 at 11:29, vino yang <ya...@gmail.com> wrote:
>
>> Hi Ahmad,
>>
>> Can you try to dump thread info from the Task Manager's JVM instance?
>>
>> Thanks, vino.
>>
>> Ahmad Hassan <ah...@gmail.com> 于2018年10月20日周六 下午4:24写道:
>>
>>> Flink 1.6.0. Valuestate initialises successful but mapstate hangs
>>>
>>> Regards
>>>
>>> On 20 Oct 2018, at 02:55, vino yang <ya...@gmail.com> wrote:
>>>
>>> Hi Ahmad,
>>>
>>> Which version of Flink do you use?
>>>
>>> Thanks, vino.
>>>
>>> Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>>>
>>>> Hi,
>>>>
>>>> Initializing mapstate hangs in window function. However if i use
>>>> valuestate then it is initialized succcessfully. I am using rocksdb to
>>>> store the state.
>>>>
>>>> public class MyWindowFunction extends RichWindowFunction<Event,
>>>> Payload, Tuple, TimeWindow>
>>>> {
>>>> private transient MapStateDescriptor<String, String> productsDescriptor
>>>> = new MapStateDescriptor<>(
>>>> "mapState", String.class, String.class);
>>>>
>>>> @Override
>>>> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
>>>> input,
>>>> final Collector<Payload> out)
>>>> {
>>>> // do something
>>>> }
>>>>
>>>> @Override
>>>> public void open(Configuration parameters) throws Exception
>>>> {
>>>> System.out.println("## open init window state ");
>>>> * MapState<String, String> state =
>>>> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
>>>> here*
>>>> System.out.println("## open window state " + state);
>>>> }
>>>> }
>>>>
>>>> Thanks for the help.
>>>>
>>>

Re: Initializing mapstate hangs

Posted by Ahmad Hassan <ah...@gmail.com>.
2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task
                - Window(SlidingProcessingTimeWindows(180000, 180000),
TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1)
(5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.

java.lang.NullPointerException: The state properties must not be null

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)

at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)

at
com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)

at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at
org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)

at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)




On Sat, 20 Oct 2018 at 11:29, vino yang <ya...@gmail.com> wrote:

> Hi Ahmad,
>
> Can you try to dump thread info from the Task Manager's JVM instance?
>
> Thanks, vino.
>
> Ahmad Hassan <ah...@gmail.com> 于2018年10月20日周六 下午4:24写道:
>
>> Flink 1.6.0. Valuestate initialises successful but mapstate hangs
>>
>> Regards
>>
>> On 20 Oct 2018, at 02:55, vino yang <ya...@gmail.com> wrote:
>>
>> Hi Ahmad,
>>
>> Which version of Flink do you use?
>>
>> Thanks, vino.
>>
>> Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>>
>>> Hi,
>>>
>>> Initializing mapstate hangs in window function. However if i use
>>> valuestate then it is initialized succcessfully. I am using rocksdb to
>>> store the state.
>>>
>>> public class MyWindowFunction extends RichWindowFunction<Event, Payload,
>>> Tuple, TimeWindow>
>>> {
>>> private transient MapStateDescriptor<String, String> productsDescriptor
>>> = new MapStateDescriptor<>(
>>> "mapState", String.class, String.class);
>>>
>>> @Override
>>> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
>>> input,
>>> final Collector<Payload> out)
>>> {
>>> // do something
>>> }
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception
>>> {
>>> System.out.println("## open init window state ");
>>> * MapState<String, String> state =
>>> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
>>> here*
>>> System.out.println("## open window state " + state);
>>> }
>>> }
>>>
>>> Thanks for the help.
>>>
>>

Re: Initializing mapstate hangs

Posted by vino yang <ya...@gmail.com>.
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <ah...@gmail.com> 于2018年10月20日周六 下午4:24写道:

> Flink 1.6.0. Valuestate initialises successful but mapstate hangs
>
> Regards
>
> On 20 Oct 2018, at 02:55, vino yang <ya...@gmail.com> wrote:
>
> Hi Ahmad,
>
> Which version of Flink do you use?
>
> Thanks, vino.
>
> Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>
>> Hi,
>>
>> Initializing mapstate hangs in window function. However if i use
>> valuestate then it is initialized succcessfully. I am using rocksdb to
>> store the state.
>>
>> public class MyWindowFunction extends RichWindowFunction<Event, Payload,
>> Tuple, TimeWindow>
>> {
>> private transient MapStateDescriptor<String, String> productsDescriptor =
>> new MapStateDescriptor<>(
>> "mapState", String.class, String.class);
>>
>> @Override
>> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
>> input,
>> final Collector<Payload> out)
>> {
>> // do something
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception
>> {
>> System.out.println("## open init window state ");
>> * MapState<String, String> state =
>> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
>> here*
>> System.out.println("## open window state " + state);
>> }
>> }
>>
>> Thanks for the help.
>>
>

Re: Initializing mapstate hangs

Posted by Ahmad Hassan <ah...@gmail.com>.
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

> On 20 Oct 2018, at 02:55, vino yang <ya...@gmail.com> wrote:
> 
> Hi Ahmad,
> 
> Which version of Flink do you use?
> 
> Thanks, vino.
> 
> Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>> Hi,
>> 
>> Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.
>> 
>> public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
>> {
>> 	private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
>> 			"mapState", String.class, String.class);
>> 
>> 	@Override
>> 	public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
>> 			final Collector<Payload> out)
>> 	{
>> 		// do something
>> 	}
>> 
>> 	@Override
>> 	public void open(Configuration parameters) throws Exception
>> 	{
>> 		System.out.println("## open init window state ");
>> 		MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
>> 		System.out.println("## open window state " + state);
>> 	}
>> }
>> 
>> Thanks for the help.

Re: Initializing mapstate hangs

Posted by vino yang <ya...@gmail.com>.
Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <ah...@gmail.com> 于2018年10月19日周五 下午11:32写道:

> Hi,
>
> Initializing mapstate hangs in window function. However if i use
> valuestate then it is initialized succcessfully. I am using rocksdb to
> store the state.
>
> public class MyWindowFunction extends RichWindowFunction<Event, Payload,
> Tuple, TimeWindow>
> {
> private transient MapStateDescriptor<String, String> productsDescriptor =
> new MapStateDescriptor<>(
> "mapState", String.class, String.class);
>
> @Override
> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
> input,
> final Collector<Payload> out)
> {
> // do something
> }
>
> @Override
> public void open(Configuration parameters) throws Exception
> {
> System.out.println("## open init window state ");
> * MapState<String, String> state =
> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
> here*
> System.out.println("## open window state " + state);
> }
> }
>
> Thanks for the help.
>