You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yunfan123 <yu...@foxmail.com> on 2017/05/07 13:14:48 UTC
Can ValueState use generics?
My process function is like :
private static class MergeFunction extends
RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer,
ObjectNode>> {
private ValueState<Tuple2<Integer, ObjectNode>> state;
@Override
@SuppressWarnings("unchecked")
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate",
(Class<Tuple2<Integer,ObjectNode>>)
(Object)Tuple2.class));
}
}
When I running the code:
05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
to FAILED
java.lang.RuntimeException: Cannot create full type information based on the
given class. If the type has generics, please
at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
at
org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
at
com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Tuple needs to be parameterized by using generics.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
... 9 more
Can I use generics with ValueState?
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Can ValueState use generics?
Posted by Stephan Ewen <se...@apache.org>.
Please use
"new ValueStateDescriptor<>("mystate", TypeInformation.of(new TypeHint
<Tuple2<Integer,ObjectNode>>(){}));
That should work...
On Mon, May 8, 2017 at 1:11 PM, Chesnay Schepler <ch...@apache.org> wrote:
> If you want to use generics you have to either provide a TypeInformation
> instead of a class or create a class that extends Tuple2(Integer,
> ObjectNode) and use it as the class argument.
>
>
> On 07.05.2017 15:14, yunfan123 wrote:
>
>> My process function is like :
>>
>> private static class MergeFunction extends
>> RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer,
>> ObjectNode>> {
>>
>> private ValueState<Tuple2<Integer, ObjectNode>> state;
>>
>> @Override
>> @SuppressWarnings("unchecked")
>> public void open(Configuration parameters) throws Exception {
>> state = getRuntimeContext().getState(new
>> ValueStateDescriptor<>("mystate",
>> (Class<Tuple2<Integer,ObjectNode>>)
>> (Object)Tuple2.class));
>> }
>> }
>>
>>
>> When I running the code:
>> 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1)
>> switched
>> to FAILED
>> java.lang.RuntimeException: Cannot create full type information based on
>> the
>> given class. If the type has generics, please
>> at
>> org.apache.flink.api.common.state.StateDescriptor.<init>(Sta
>> teDescriptor.java:124)
>> at
>> org.apache.flink.api.common.state.ValueStateDescriptor.<init
>> >(ValueStateDescriptor.java:101)
>> at
>> com.bytedance.flinkjob.activationSource.AppActivationSource$
>> MergeFunction.open(AppActivationSource.java:134)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:112)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.ope
>> n(ProcessOperator.java:55)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:375)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:251)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Tuple needs to be parameterized by using generics.
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> InfoWithTypeHierarchy(TypeExtractor.java:673)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCre
>> ateTypeInfo(TypeExtractor.java:607)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> Info(TypeExtractor.java:561)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> Info(TypeExtractor.java:557)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.<init>(Sta
>> teDescriptor.java:122)
>> ... 9 more
>>
>> Can I use generics with ValueState?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Can-ValueState-use-
>> generics-tp13038.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>
Re: Can ValueState use generics?
Posted by Chesnay Schepler <ch...@apache.org>.
If you want to use generics you have to either provide a TypeInformation
instead of a class or create a class that extends Tuple2(Integer,
ObjectNode) and use it as the class argument.
On 07.05.2017 15:14, yunfan123 wrote:
> My process function is like :
>
> private static class MergeFunction extends
> RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer,
> ObjectNode>> {
>
> private ValueState<Tuple2<Integer, ObjectNode>> state;
>
> @Override
> @SuppressWarnings("unchecked")
> public void open(Configuration parameters) throws Exception {
> state = getRuntimeContext().getState(new
> ValueStateDescriptor<>("mystate",
> (Class<Tuple2<Integer,ObjectNode>>)
> (Object)Tuple2.class));
> }
> }
>
>
> When I running the code:
> 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
> to FAILED
> java.lang.RuntimeException: Cannot create full type information based on the
> given class. If the type has generics, please
> at
> org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
> at
> org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
> at
> com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Tuple needs to be parameterized by using generics.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
> at
> org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
> ... 9 more
>
> Can I use generics with ValueState?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>