You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "freeza1982@outlook.com" <fr...@outlook.com> on 2020/10/20 05:48:20 UTC
单任务多条流的逻辑报错
Hi all:
请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class invalid for deserialization
at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
大致逻辑如下, 我有2条流:
1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
代码如下:
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
List<KafkaInfo> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink();
RecordTransformOperator transformOperator = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
RecordKeySelector keySelector = new RecordKeySelector();
RecordComputeOperator computeOperator = new RecordComputeOperator();
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer());
FlinkKafkaProducer011 flinkKafkaProducer
= new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties);
List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList = new ArrayList<>();
for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer());
sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord());
sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs());
String topicName = kafkaInfo.getTopicName();
FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
SingleOutputStreamOperator<Tuple2<String, String>> singleOutputStreamOperator =
streamExecutionEnvironment.addSource(flinkKafkaConsumer);
dataStreamList.add(singleOutputStreamOperator);
}
DataStream<Tuple2<String, String>> unionDataStream = dataStreamList.get(0);
for(int i = 1; i<dataStreamList.size(); i++) {
unionDataStream = unionDataStream.union(dataStreamList.get(i));
}
unionDataStream.flatMap(transformOperator)
.keyBy(keySelector)
.flatMap(computeOperator)
.addSink(flinkKafkaProducer);
RecordTransformOperator transformOperator1 = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
Properties sinkProperties1 = new Properties();
sinkProperties1.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer());
FlinkKafkaProducer011 flinkKafkaProducer1
= new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties1);
KafkaInfo kafkaInfo = this.kafkaConfiguration.getSource1().get(0);
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer());
sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord());
sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs());
String topicName = kafkaInfo.getTopicName();
FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
streamExecutionEnvironment
.addSource(flinkKafkaConsumer)
.flatMap(transformOperator1)
.addSink(flinkKafkaProducer1);
streamExecutionEnvironment.execute();
freeza1982@outlook.com
Re: Re: 单任务多条流的逻辑报错
Posted by "freeza1982@outlook.com" <fr...@outlook.com>.
InstantiationUtil这个类我代码中并未显示调用,具体应该如何定位问题原因,烦请看下能否提供下可行的解决方案
freeza1982@outlook.com
发件人: Robin Zhang
发送时间: 2020-10-20 14:39
收件人: user-zh
主题: Re: 单任务多条流的逻辑报错
Hi,
根据报错内容,定位到你的代码在
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。
Best,
Robin
freeza1982@outlook.com wrote
> Hi all:
>
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
> at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
> StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> List
> <KafkaInfo>
> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
> KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
> RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
> RecordKeySelector keySelector = new RecordKeySelector();
> RecordComputeOperator computeOperator = new
> RecordComputeOperator();
> Properties sinkProperties = new Properties();
> sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
>
> List<SingleOutputStreamOperator<Tuple2<String,
> String>>> dataStreamList = new ArrayList<>();
> for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> SingleOutputStreamOperator<Tuple2<String, String>>
> singleOutputStreamOperator =
>
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
> dataStreamList.add(singleOutputStreamOperator);
> }
>
> DataStream<Tuple2<String, String>> unionDataStream =
> dataStreamList.get(0);
> for(int i = 1; i<dataStreamList.size(); i++) {
> unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
> }
> unionDataStream.flatMap(transformOperator)
> .keyBy(keySelector)
> .flatMap(computeOperator)
> .addSink(flinkKafkaProducer);
>
> RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
> Properties sinkProperties1 = new Properties();
> sinkProperties1.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer1
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
> KafkaInfo kafkaInfo =
> this.kafkaConfiguration.getSource1().get(0);
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> streamExecutionEnvironment
> .addSource(flinkKafkaConsumer)
> .flatMap(transformOperator1)
> .addSink(flinkKafkaProducer1);
> streamExecutionEnvironment.execute();
>
>
> freeza1982@
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 单任务多条流的逻辑报错
Posted by Robin Zhang <vi...@outlook.com>.
Hi,
根据报错内容,定位到你的代码在
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。
Best,
Robin
freeza1982@outlook.com wrote
> Hi all:
>
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
> at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
> StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> List
> <KafkaInfo>
> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
> KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
> RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
> RecordKeySelector keySelector = new RecordKeySelector();
> RecordComputeOperator computeOperator = new
> RecordComputeOperator();
> Properties sinkProperties = new Properties();
> sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
>
> List<SingleOutputStreamOperator<Tuple2<String,
> String>>> dataStreamList = new ArrayList<>();
> for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> SingleOutputStreamOperator<Tuple2<String, String>>
> singleOutputStreamOperator =
>
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
> dataStreamList.add(singleOutputStreamOperator);
> }
>
> DataStream<Tuple2<String, String>> unionDataStream =
> dataStreamList.get(0);
> for(int i = 1; i<dataStreamList.size(); i++) {
> unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
> }
> unionDataStream.flatMap(transformOperator)
> .keyBy(keySelector)
> .flatMap(computeOperator)
> .addSink(flinkKafkaProducer);
>
> RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
> Properties sinkProperties1 = new Properties();
> sinkProperties1.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer1
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
> KafkaInfo kafkaInfo =
> this.kafkaConfiguration.getSource1().get(0);
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> streamExecutionEnvironment
> .addSource(flinkKafkaConsumer)
> .flatMap(transformOperator1)
> .addSink(flinkKafkaProducer1);
> streamExecutionEnvironment.execute();
>
>
> freeza1982@
--
Sent from: http://apache-flink.147419.n8.nabble.com/