You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lafeier (Jira)" <ji...@apache.org> on 2021/01/05 02:39:00 UTC
[jira] [Closed] (FLINK-20801) Using asynchronous methods in
operators causes serialization problems
[ https://issues.apache.org/jira/browse/FLINK-20801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lafeier closed FLINK-20801.
---------------------------
> Using asynchronous methods in operators causes serialization problems
> ---------------------------------------------------------------------
>
> Key: FLINK-20801
> URL: https://issues.apache.org/jira/browse/FLINK-20801
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.11.2
> Environment: * flink 1.11.2
> * java8
> * windows
> Reporter: lafeier
> Priority: Major
> Fix For: 1.11.2
>
>
> Using asynchronous methods in operators causes serialization problems.
> Exceptions are indeterminate, for example:
> {code:java}
> java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: Corrupt stream, found tag: 21 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
> {code}
>
> {code:java}
> java.lang.RuntimeException: Cannot instantiate class.java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.ClassNotFoundException: e11 name12
> {code}
> {code:java}
> org.apache.flink.types.NullKeyFieldException: Unable to access field java.lang.String cn.ubattery.Person.name on object nullorg.apache.flink.types.NullKeyFieldException: Unable to access field java.lang.String cn.ubattery.Person.name on object null at org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181) at org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185) at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
> {code}
> It is only reproduced in the following order:
> * flatMap (Multithreaded asynchronous operations are used in this method)
> * keyBy ( Only the subsequent use of the Keyby operator will cause this problem)
> * flatMap
> {color:#00875a}*code*{color}
> {code:java}
> @Test
> public void testConcurrentKryoException() throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> List<Person> list = new ArrayList<>();
> for (int i = 0; i < 100; i++) {
> Person p = new Person("name" + i, i);
> list.add(p);
> }
> DataStreamSource<Person> ds = env.fromCollection(list);
> ds.flatMap(new FlatMapFunction<Person, Person>() {
> @Override
> public void flatMap(Person value, Collector<Person> out) throws Exception {
> CompletableFuture.supplyAsync(()->{
> return value;
> }).whenComplete((data,ex)->{
> out.collect(data);
> });
> }
> }).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() {
> @Override
> public void flatMap(Person value, Collector<Person> out) throws Exception {
> }
> });
> env.execute("test");
> }{code}
> {code:java}
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
> public class Person {
> String name;
> int age;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)