You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shen (Jira)" <ji...@apache.org> on 2023/04/03 11:40:00 UTC

[jira] [Created] (FLINK-31708) RuntimeException/KryoException thrown when deserializing an empty protobuf record

shen created FLINK-31708:
----------------------------

             Summary: RuntimeException/KryoException thrown when deserializing an empty protobuf record
                 Key: FLINK-31708
                 URL: https://issues.apache.org/jira/browse/FLINK-31708
             Project: Flink
          Issue Type: Bug
          Components: API / Type Serialization System
    Affects Versions: 1.17.0, 1.10.0
            Reporter: shen


h1. Problem description

I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:
{code:java}
java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf
        at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
        at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
        at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
        at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
        ... 16 common frames omitted
 {code}
h1. How to reproduce

I think this is similar to another issue: FLINK-29347.

Follwing is an example to reproduce the problem:
{code:java}
package com.test;

import com.test.ProtobufGeneratedClass;

import com.google.protobuf.Message;
import com.twitter.chill.protobuf.ProtobufSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Random;
@Slf4j
public class app {
  public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 =
      new OutputTag<ProtobufGeneratedClass>("output-tag-1") {
  };

  public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 =
      new OutputTag<ProtobufGeneratedClass>("output-tag-2") {
  };

  public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 =
      new OutputTag<ProtobufGeneratedClass>("output-tag-3") {
  };

  public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> {
    Random rnd = new Random();
    private final String name;

    private boolean running = true;

    private MySourceFunction(String name) {
      this.name = name;
    }

    @Override
    public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception {
      final int index = getRuntimeContext().getIndexOfThisSubtask();
      int counter = 0;

      while (running) {
        synchronized (sourceContext.getCheckpointLock()) {
          ++counter;
          ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder();
          if (rnd.nextBoolean()) {

            builder.addGraphIds(rnd.nextInt(10));
            byte[] bytes;
            if (rnd.nextInt(10) == 1) {
              // make sure record is large enough to reproduce the problem
              // in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated
              bytes = new byte[rnd.nextInt(5000000)];
            } else if (rnd.nextInt(10) == 2) {
              bytes = new byte[rnd.nextInt(50000)];
            } else {
              bytes = new byte[rnd.nextInt(50)];
            }
            builder.addUserTagNames(new String(bytes));
          } else {
				// create an empty record by do nothing.
          }
          sourceContext.collect(builder.build());
          Thread.sleep(5);
        }
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }

  public static void main(String[] args) throws Exception {

    final int SHARD_NUM = 64;
    final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

    // set up the execution environment
    Configuration config = new Configuration();
    config.setInteger("state.checkpoints.num-retained", 5);
    config.setInteger("taskmanager.numberOfTaskSlots", 1);
    config.setInteger("local.number-taskmanager", 4);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config);

    RocksDBStateBackend rocksDBStateBackend =
        new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true);

    env.setParallelism(3);
    env.setStateBackend(rocksDBStateBackend);
    env.getCheckpointConfig().setCheckpointTimeout(100000);
    env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10)));
    env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf.

    // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    String[] words = new String[100000];
    Random rnd = new Random();
    for (int i = 0; i < words.length; ++i) {
      words[i] = String.valueOf(rnd.nextInt(10));
    }
    DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4);
    BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3)
        .broadcast(new MapStateDescriptor[0]);
    SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle()
        .map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() {

          @Override
          public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception {
            return value;
          }
        }).setParallelism(2).disableChaining()
        .keyBy(x -> x.hashCode() % 10)
        .connect(stream2)
        .process(new MyProcessFunction()).disableChaining();

    output.getSideOutput(OUTPUT_TAG_1).rescale().
        addSink(new SinkFunction<ProtobufGeneratedClass>() {
          @Override
          public void invoke(ProtobufGeneratedClass value) throws Exception {
            log.info("blah 1");
          }
        }).setParallelism(1);

    output.getSideOutput(OUTPUT_TAG_2).rescale().
        addSink(new SinkFunction<ProtobufGeneratedClass>() {
          @Override
          public void invoke(ProtobufGeneratedClass value) throws Exception {
            log.info("blah 2");
          }
        }).setParallelism(2);

    output.getSideOutput(OUTPUT_TAG_3).rescale().
        addSink(new SinkFunction<ProtobufGeneratedClass>() {
          @Override
          public void invoke(ProtobufGeneratedClass value) throws Exception {
            log.info("blah 3");
          }
        }).setParallelism(3);

    output.map(new MapFunction<ProtobufGeneratedClass, String>() {
      @Override
      public String map(ProtobufGeneratedClass value) throws Exception {
        return "" + value.toString().length();
      }
    }).print();
    env.execute("reproduce-the-problem");
  }

  public static class MyProcessFunction extends
      KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> {

    @Override
    public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass,
        KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext,
        Collector<ProtobufGeneratedClass> collector) throws Exception {
      collector.collect(ProtobufGeneratedClass);
    }

    @Override
    public void processBroadcastElement(ProtobufGeneratedClass s,
        KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context,
        Collector<ProtobufGeneratedClass> collector) throws Exception {
      context.output(OUTPUT_TAG_1, s);
      context.output(OUTPUT_TAG_2, s);
      context.output(OUTPUT_TAG_3, s);
    }
  }
}

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)