You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shen (Jira)" <ji...@apache.org> on 2023/04/03 11:40:00 UTC
[jira] [Created] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record
shen created FLINK-31708:
----------------------------
Summary: RuntimeException/KryoException thrown when deserializing an empty protobuf record
Key: FLINK-31708
URL: https://issues.apache.org/jira/browse/FLINK-31708
Project: Flink
Issue Type: Bug
Components: API / Type Serialization System
Affects Versions: 1.17.0, 1.10.0
Reporter: shen
h1. Problem description
I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:
{code:java}
java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf
at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
... 16 common frames omitted
{code}
h1. How to reproduce
I think this is similar to another issue: FLINK-29347.
Follwing is an example to reproduce the problem:
{code:java}
package com.test;
import com.test.ProtobufGeneratedClass;
import com.google.protobuf.Message;
import com.twitter.chill.protobuf.ProtobufSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Random;
@Slf4j
public class app {
public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 =
new OutputTag<ProtobufGeneratedClass>("output-tag-1") {
};
public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 =
new OutputTag<ProtobufGeneratedClass>("output-tag-2") {
};
public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 =
new OutputTag<ProtobufGeneratedClass>("output-tag-3") {
};
public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> {
Random rnd = new Random();
private final String name;
private boolean running = true;
private MySourceFunction(String name) {
this.name = name;
}
@Override
public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception {
final int index = getRuntimeContext().getIndexOfThisSubtask();
int counter = 0;
while (running) {
synchronized (sourceContext.getCheckpointLock()) {
++counter;
ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder();
if (rnd.nextBoolean()) {
builder.addGraphIds(rnd.nextInt(10));
byte[] bytes;
if (rnd.nextInt(10) == 1) {
// make sure record is large enough to reproduce the problem
// in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated
bytes = new byte[rnd.nextInt(5000000)];
} else if (rnd.nextInt(10) == 2) {
bytes = new byte[rnd.nextInt(50000)];
} else {
bytes = new byte[rnd.nextInt(50)];
}
builder.addUserTagNames(new String(bytes));
} else {
// create an empty record by do nothing.
}
sourceContext.collect(builder.build());
Thread.sleep(5);
}
}
}
@Override
public void cancel() {
running = false;
}
}
public static void main(String[] args) throws Exception {
final int SHARD_NUM = 64;
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
Configuration config = new Configuration();
config.setInteger("state.checkpoints.num-retained", 5);
config.setInteger("taskmanager.numberOfTaskSlots", 1);
config.setInteger("local.number-taskmanager", 4);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config);
RocksDBStateBackend rocksDBStateBackend =
new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true);
env.setParallelism(3);
env.setStateBackend(rocksDBStateBackend);
env.getCheckpointConfig().setCheckpointTimeout(100000);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10)));
env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf.
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
String[] words = new String[100000];
Random rnd = new Random();
for (int i = 0; i < words.length; ++i) {
words[i] = String.valueOf(rnd.nextInt(10));
}
DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4);
BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3)
.broadcast(new MapStateDescriptor[0]);
SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle()
.map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() {
@Override
public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception {
return value;
}
}).setParallelism(2).disableChaining()
.keyBy(x -> x.hashCode() % 10)
.connect(stream2)
.process(new MyProcessFunction()).disableChaining();
output.getSideOutput(OUTPUT_TAG_1).rescale().
addSink(new SinkFunction<ProtobufGeneratedClass>() {
@Override
public void invoke(ProtobufGeneratedClass value) throws Exception {
log.info("blah 1");
}
}).setParallelism(1);
output.getSideOutput(OUTPUT_TAG_2).rescale().
addSink(new SinkFunction<ProtobufGeneratedClass>() {
@Override
public void invoke(ProtobufGeneratedClass value) throws Exception {
log.info("blah 2");
}
}).setParallelism(2);
output.getSideOutput(OUTPUT_TAG_3).rescale().
addSink(new SinkFunction<ProtobufGeneratedClass>() {
@Override
public void invoke(ProtobufGeneratedClass value) throws Exception {
log.info("blah 3");
}
}).setParallelism(3);
output.map(new MapFunction<ProtobufGeneratedClass, String>() {
@Override
public String map(ProtobufGeneratedClass value) throws Exception {
return "" + value.toString().length();
}
}).print();
env.execute("reproduce-the-problem");
}
public static class MyProcessFunction extends
KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> {
@Override
public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass,
KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext,
Collector<ProtobufGeneratedClass> collector) throws Exception {
collector.collect(ProtobufGeneratedClass);
}
@Override
public void processBroadcastElement(ProtobufGeneratedClass s,
KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context,
Collector<ProtobufGeneratedClass> collector) throws Exception {
context.output(OUTPUT_TAG_1, s);
context.output(OUTPUT_TAG_2, s);
context.output(OUTPUT_TAG_3, s);
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)