You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/04/03 12:09:00 UTC
[jira] [Updated] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record
[ https://issues.apache.org/jira/browse/FLINK-31708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-31708:
-----------------------------------
Labels: pull-request-available (was: )
> RuntimeException/KryoException thrown when deserializing an empty protobuf record
> ---------------------------------------------------------------------------------
>
> Key: FLINK-31708
> URL: https://issues.apache.org/jira/browse/FLINK-31708
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.10.0, 1.17.0
> Reporter: shen
> Priority: Major
> Labels: pull-request-available
>
> h1. Problem description
> I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:
> {code:java}
> java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf
> at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 16 common frames omitted
> {code}
> h1. How to reproduce
> I think this is similar to another issue: FLINK-29347.
> Follwing is an example to reproduce the problem:
> {code:java}
> package com.test;
> import com.test.ProtobufGeneratedClass;
> import com.google.protobuf.Message;
> import com.twitter.chill.protobuf.ProtobufSerializer;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
> import java.util.Random;
> @Slf4j
> public class app {
> public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 =
> new OutputTag<ProtobufGeneratedClass>("output-tag-1") {
> };
> public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 =
> new OutputTag<ProtobufGeneratedClass>("output-tag-2") {
> };
> public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 =
> new OutputTag<ProtobufGeneratedClass>("output-tag-3") {
> };
> public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> {
> Random rnd = new Random();
> private final String name;
> private boolean running = true;
> private MySourceFunction(String name) {
> this.name = name;
> }
> @Override
> public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception {
> final int index = getRuntimeContext().getIndexOfThisSubtask();
> int counter = 0;
> while (running) {
> synchronized (sourceContext.getCheckpointLock()) {
> ++counter;
> ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder();
> if (rnd.nextBoolean()) {
> builder.addGraphIds(rnd.nextInt(10));
> byte[] bytes;
> if (rnd.nextInt(10) == 1) {
> // make sure record is large enough to reproduce the problem
> // in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated
> bytes = new byte[rnd.nextInt(5000000)];
> } else if (rnd.nextInt(10) == 2) {
> bytes = new byte[rnd.nextInt(50000)];
> } else {
> bytes = new byte[rnd.nextInt(50)];
> }
> builder.addUserTagNames(new String(bytes));
> } else {
> // create an empty record by do nothing.
> }
> sourceContext.collect(builder.build());
> Thread.sleep(5);
> }
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> }
> public static void main(String[] args) throws Exception {
> final int SHARD_NUM = 64;
> final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
> // set up the execution environment
> Configuration config = new Configuration();
> config.setInteger("state.checkpoints.num-retained", 5);
> config.setInteger("taskmanager.numberOfTaskSlots", 1);
> config.setInteger("local.number-taskmanager", 4);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config);
> RocksDBStateBackend rocksDBStateBackend =
> new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true);
> env.setParallelism(3);
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(100000);
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10)));
> env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf.
> // make parameters available in the web interface
> env.getConfig().setGlobalJobParameters(params);
> String[] words = new String[100000];
> Random rnd = new Random();
> for (int i = 0; i < words.length; ++i) {
> words[i] = String.valueOf(rnd.nextInt(10));
> }
> DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4);
> BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3)
> .broadcast(new MapStateDescriptor[0]);
> SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle()
> .map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() {
> @Override
> public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception {
> return value;
> }
> }).setParallelism(2).disableChaining()
> .keyBy(x -> x.hashCode() % 10)
> .connect(stream2)
> .process(new MyProcessFunction()).disableChaining();
> output.getSideOutput(OUTPUT_TAG_1).rescale().
> addSink(new SinkFunction<ProtobufGeneratedClass>() {
> @Override
> public void invoke(ProtobufGeneratedClass value) throws Exception {
> log.info("blah 1");
> }
> }).setParallelism(1);
> output.getSideOutput(OUTPUT_TAG_2).rescale().
> addSink(new SinkFunction<ProtobufGeneratedClass>() {
> @Override
> public void invoke(ProtobufGeneratedClass value) throws Exception {
> log.info("blah 2");
> }
> }).setParallelism(2);
> output.getSideOutput(OUTPUT_TAG_3).rescale().
> addSink(new SinkFunction<ProtobufGeneratedClass>() {
> @Override
> public void invoke(ProtobufGeneratedClass value) throws Exception {
> log.info("blah 3");
> }
> }).setParallelism(3);
> output.map(new MapFunction<ProtobufGeneratedClass, String>() {
> @Override
> public String map(ProtobufGeneratedClass value) throws Exception {
> return "" + value.toString().length();
> }
> }).print();
> env.execute("reproduce-the-problem");
> }
> public static class MyProcessFunction extends
> KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> {
> @Override
> public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass,
> KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext,
> Collector<ProtobufGeneratedClass> collector) throws Exception {
> collector.collect(ProtobufGeneratedClass);
> }
> @Override
> public void processBroadcastElement(ProtobufGeneratedClass s,
> KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context,
> Collector<ProtobufGeneratedClass> collector) throws Exception {
> context.output(OUTPUT_TAG_1, s);
> context.output(OUTPUT_TAG_2, s);
> context.output(OUTPUT_TAG_3, s);
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)