You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2023/04/20 05:30:00 UTC

[jira] [Assigned] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record

     [ https://issues.apache.org/jira/browse/FLINK-31708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao reassigned FLINK-31708:
-------------------------------

    Assignee: shen

> RuntimeException/KryoException thrown when deserializing an empty protobuf record
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-31708
>                 URL: https://issues.apache.org/jira/browse/FLINK-31708
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.10.0, 1.17.0
>            Reporter: shen
>            Assignee: shen
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Problem description
> I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:
> {code:java}
> java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf
>         at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>         at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
>         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>         at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
>         at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
>         at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
>         at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>         ... 16 common frames omitted
>  {code}
> h1. How to reproduce
> I think this is similar to another issue: FLINK-29347.
> Follwing is an example to reproduce the problem:
> {code:java}
> package com.test;
> import com.test.ProtobufGeneratedClass;
> import com.google.protobuf.Message;
> import com.twitter.chill.protobuf.ProtobufSerializer;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
> import java.util.Random;
> @Slf4j
> public class app {
>   public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 =
>       new OutputTag<ProtobufGeneratedClass>("output-tag-1") {
>   };
>   public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 =
>       new OutputTag<ProtobufGeneratedClass>("output-tag-2") {
>   };
>   public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 =
>       new OutputTag<ProtobufGeneratedClass>("output-tag-3") {
>   };
>   public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> {
>     Random rnd = new Random();
>     private final String name;
>     private boolean running = true;
>     private MySourceFunction(String name) {
>       this.name = name;
>     }
>     @Override
>     public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception {
>       final int index = getRuntimeContext().getIndexOfThisSubtask();
>       int counter = 0;
>       while (running) {
>         synchronized (sourceContext.getCheckpointLock()) {
>           ++counter;
>           ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder();
>           if (rnd.nextBoolean()) {
>             builder.addGraphIds(rnd.nextInt(10));
>             byte[] bytes;
>             if (rnd.nextInt(10) == 1) {
>               // make sure record is large enough to reproduce the problem
>               // in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated
>               bytes = new byte[rnd.nextInt(5000000)];
>             } else if (rnd.nextInt(10) == 2) {
>               bytes = new byte[rnd.nextInt(50000)];
>             } else {
>               bytes = new byte[rnd.nextInt(50)];
>             }
>             builder.addUserTagNames(new String(bytes));
>           } else {
> 				// create an empty record by do nothing.
>           }
>           sourceContext.collect(builder.build());
>           Thread.sleep(5);
>         }
>       }
>     }
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
>   public static void main(String[] args) throws Exception {
>     final int SHARD_NUM = 64;
>     final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
>     // set up the execution environment
>     Configuration config = new Configuration();
>     config.setInteger("state.checkpoints.num-retained", 5);
>     config.setInteger("taskmanager.numberOfTaskSlots", 1);
>     config.setInteger("local.number-taskmanager", 4);
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config);
>     RocksDBStateBackend rocksDBStateBackend =
>         new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true);
>     env.setParallelism(3);
>     env.setStateBackend(rocksDBStateBackend);
>     env.getCheckpointConfig().setCheckpointTimeout(100000);
>     env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>     env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10)));
>     env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf.
>     // make parameters available in the web interface
>     env.getConfig().setGlobalJobParameters(params);
>     String[] words = new String[100000];
>     Random rnd = new Random();
>     for (int i = 0; i < words.length; ++i) {
>       words[i] = String.valueOf(rnd.nextInt(10));
>     }
>     DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4);
>     BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3)
>         .broadcast(new MapStateDescriptor[0]);
>     SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle()
>         .map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() {
>           @Override
>           public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception {
>             return value;
>           }
>         }).setParallelism(2).disableChaining()
>         .keyBy(x -> x.hashCode() % 10)
>         .connect(stream2)
>         .process(new MyProcessFunction()).disableChaining();
>     output.getSideOutput(OUTPUT_TAG_1).rescale().
>         addSink(new SinkFunction<ProtobufGeneratedClass>() {
>           @Override
>           public void invoke(ProtobufGeneratedClass value) throws Exception {
>             log.info("blah 1");
>           }
>         }).setParallelism(1);
>     output.getSideOutput(OUTPUT_TAG_2).rescale().
>         addSink(new SinkFunction<ProtobufGeneratedClass>() {
>           @Override
>           public void invoke(ProtobufGeneratedClass value) throws Exception {
>             log.info("blah 2");
>           }
>         }).setParallelism(2);
>     output.getSideOutput(OUTPUT_TAG_3).rescale().
>         addSink(new SinkFunction<ProtobufGeneratedClass>() {
>           @Override
>           public void invoke(ProtobufGeneratedClass value) throws Exception {
>             log.info("blah 3");
>           }
>         }).setParallelism(3);
>     output.map(new MapFunction<ProtobufGeneratedClass, String>() {
>       @Override
>       public String map(ProtobufGeneratedClass value) throws Exception {
>         return "" + value.toString().length();
>       }
>     }).print();
>     env.execute("reproduce-the-problem");
>   }
>   public static class MyProcessFunction extends
>       KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> {
>     @Override
>     public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass,
>         KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext,
>         Collector<ProtobufGeneratedClass> collector) throws Exception {
>       collector.collect(ProtobufGeneratedClass);
>     }
>     @Override
>     public void processBroadcastElement(ProtobufGeneratedClass s,
>         KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context,
>         Collector<ProtobufGeneratedClass> collector) throws Exception {
>       context.output(OUTPUT_TAG_1, s);
>       context.output(OUTPUT_TAG_2, s);
>       context.output(OUTPUT_TAG_3, s);
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)