You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Haoze Wu (Jira)" <ji...@apache.org> on 2022/11/15 18:09:00 UTC

[jira] [Updated] (FLINK-30032) IOException during MAX_WATERMARK emission causes message missing

     [ https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Haoze Wu updated FLINK-30032:
-----------------------------
    Description: 
We are doing testing on Flink (version 1.14.0). We launch 1 StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink client which submit a WordCount workload. The code is similar to [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], and we only add a Kafka topic output:
{code:java}
    private static DataStreamSink<String> addKafkaSink(
            final DataStream<String> events, final String brokers, final String topic) {
        return events.sinkTo(KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .setTopic(topic)
                                .build())
                .build());
    }

    public static void run(final String[] args) throws Exception {
        final String brokers = args[0];
        final String textFilePath = args[1];
        final String kafkaTopic = args[2];
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        final DataStream<String> text = env.readTextFile(textFilePath);
        final DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
        addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
        final long nano = System.nanoTime();
        env.execute("WordCount");
        FlinkGrayClientMain.reply("success", nano);
    }
 {code}
We found that sometimes the Kafka topic fails to receive a few messages. We reproduce the symptom multiple times. We found that the Kafka topic always gets 160~169 messages while the expected number of messages is 170. We also found that the missing messages are always the expected last few messages from the 170 expected messages.

Then we inspect the logs and code.

First, we have an IOException to one of the TaskManagerRunner:
{code:java}
2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.RuntimeException: McGray injected exception
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 24 more
Caused by: java.io.IOException {code}
The IOException is from line 104 in RecordWriter#emit:
{code:java}
    protected void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();

        targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); // line 104

        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    } {code}
Here, `targetPartition.emitRecord` will finally call some file I/O or memory map I/O, triggering the IOException for some reason.

This exception is caught at `RecordWriterOutput#emitWatermark`:
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }

        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        serializationDelegate.setInstance(mark);

        try {
            recordWriter.broadcastEmit(serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    } {code}
And then caught at `ChainingOutput#emitWatermark`:
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }
        try {
            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            input.processWatermark(mark);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    } {code}
And finally caught at `ContinuousFileReaderOperator#finish`:
{code:java}
    @Override
    public void finish() throws Exception {
        LOG.debug("finishing");
        super.finish();        switch (state) {
            case IDLE:
                switchState(ReaderState.FINISHED);
                break;
            case FINISHED:
                LOG.warn("operator is already closed, doing nothing");
                return;
            default:
                switchState(ReaderState.FINISHING);
                while (!state.isTerminal()) {
                    executor.yield();
                }
        }

        try {
            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
        } catch (Exception e) {
            LOG.warn("unable to emit watermark while closing", e);
        }
    } {code}
Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.

In Flink (version 1.14.0), the full call stack of all the aforementioned workflow is:
{code:java}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
org.apache.flink.runtime.taskmanager.Task#doRun:766
org.apache.flink.runtime.taskmanager.Task#run:575
java.lang.Thread#run:748 {code}
We think the reason for missing a few ending messages in Kafka topic is in `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
{code:java}
    @Override
    public void broadcastEmit(T record) throws IOException {
        checkErroneous();

        // Emitting to all channels in a for loop can be better than calling
        // ResultPartitionWriter#broadcastRecord because the broadcastRecord
        // method incurs extra overhead.
        ByteBuffer serializedRecord = serializeRecord(serializer, record);
        for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
            serializedRecord.rewind();
            emit(record, channelIndex);   // line 67
        }

        if (flushAlways) {
            flushAll();
        }
    } {code}
Line 67 tries to emit `Watermark.MAX_WATERMARK` (from `ContinuousFileReaderOperator#finish`) to all channels. When the IOException is thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions and loop fails to continue running line 67 for the remaining channels. We reproduce the symptom multiple times and we found the number of missing messages is exactly equal to the number of affected channels.

That being said, we suspect the potential IOException at line 67 is not properly handled because the current symptom and logging is not convenient for the user to notice the issue or debug. The user may suddenly get a few ending messages missing. And then the user can only find that there is some IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still don’t know why and how a few ending messages are missing.

We would like to propose a fix for this issue. A simple solution is catching the IOException at line 67 and then do some logging and maybe retry to emit. We implemented this solution and found the symptom disappears. However, we also found that this `broadcastEmit` method is called at many places. So, this fix will also affect the other callers, and we are not sure whether this behavior is also proper for those callers.

We are looking for suggestions and feedback. Thanks!

  was:
We are doing testing on Flink (version 1.14.0). We launch 1 StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink client which submit a WordCount workload. The code is similar to [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], and we only add a Kafka topic output:

 
{code:java}
    private static DataStreamSink<String> addKafkaSink(
            final DataStream<String> events, final String brokers, final String topic) {
        return events.sinkTo(KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .setTopic(topic)
                                .build())
                .build());
    }

    public static void run(final String[] args) throws Exception {
        final String brokers = args[0];
        final String textFilePath = args[1];
        final String kafkaTopic = args[2];
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        final DataStream<String> text = env.readTextFile(textFilePath);
        final DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
        addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
        final long nano = System.nanoTime();
        env.execute("WordCount");
        FlinkGrayClientMain.reply("success", nano);
    }
 {code}
We found that sometimes the Kafka topic fails to receive a few messages. We reproduce the symptom multiple times. We found that the Kafka topic always gets 160~169 messages while the expected number of messages is 170. We also found that the missing messages are always the expected last few messages from the 170 expected messages.

Then we inspect the logs and code.

First, we have an IOException to one of the TaskManagerRunner:

 

 
{code:java}
2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.RuntimeException: McGray injected exception
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 24 more
Caused by: java.io.IOException {code}
The IOException is from line 104 in RecordWriter#emit:

 

 
{code:java}
    protected void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();

        targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); // line 104

        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    } {code}
Here, `targetPartition.emitRecord` will finally call some file I/O or memory map I/O, triggering the IOException for some reason.

 

This exception is caught at `RecordWriterOutput#emitWatermark`:

 
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }

        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        serializationDelegate.setInstance(mark);

        try {
            recordWriter.broadcastEmit(serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    } {code}
And then caught at `ChainingOutput#emitWatermark`:

 

 
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }
        try {
            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            input.processWatermark(mark);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    } {code}
And finally caught at `ContinuousFileReaderOperator#finish`:

 

 
{code:java}
    @Override
    public void finish() throws Exception {
        LOG.debug("finishing");
        super.finish();        switch (state) {
            case IDLE:
                switchState(ReaderState.FINISHED);
                break;
            case FINISHED:
                LOG.warn("operator is already closed, doing nothing");
                return;
            default:
                switchState(ReaderState.FINISHING);
                while (!state.isTerminal()) {
                    executor.yield();
                }
        }

        try {
            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
        } catch (Exception e) {
            LOG.warn("unable to emit watermark while closing", e);
        }
    } {code}
Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.

In Flink (version 1.14.0), the full call stack of all the aforementioned workflow is:

 

 
{code:java}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
org.apache.flink.runtime.taskmanager.Task#doRun:766
org.apache.flink.runtime.taskmanager.Task#run:575
java.lang.Thread#run:748 {code}
We think the reason for missing a few ending messages in Kafka topic is in `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):

 

 
{code:java}
    @Override
    public void broadcastEmit(T record) throws IOException {
        checkErroneous();

        // Emitting to all channels in a for loop can be better than calling
        // ResultPartitionWriter#broadcastRecord because the broadcastRecord
        // method incurs extra overhead.
        ByteBuffer serializedRecord = serializeRecord(serializer, record);
        for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
            serializedRecord.rewind();
            emit(record, channelIndex);   // line 67
        }

        if (flushAlways) {
            flushAll();
        }
    } {code}
Line 67 tries to emit `Watermark.MAX_WATERMARK` (from `ContinuousFileReaderOperator#finish`) to all channels. When the IOException is thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions and loop fails to continue running line 67 for the remaining channels. We reproduce the symptom multiple times and we found the number of missing messages is exactly equal to the number of affected channels.

That being said, we suspect the potential IOException at line 67 is not properly handled because the current symptom and logging is not convenient for the user to notice the issue or debug. The user may suddenly get a few ending messages missing. And then the user can only find that there is some IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still don’t know why and how a few ending messages are missing.

We would like to propose a fix for this issue. A simple solution is catching the IOException at line 67 and then do some logging and maybe retry to emit. We implemented this solution and found the symptom disappears. However, we also found that this `broadcastEmit` method is called at many places. So, this fix will also affect the other callers, and we are not sure whether this behavior is also proper for those callers.

We are looking for suggestions and feedback. Thanks!

 

 


> IOException during MAX_WATERMARK emission causes message missing
> ----------------------------------------------------------------
>
>                 Key: FLINK-30032
>                 URL: https://issues.apache.org/jira/browse/FLINK-30032
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> We are doing testing on Flink (version 1.14.0). We launch 1 StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink client which submit a WordCount workload. The code is similar to [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], and we only add a Kafka topic output:
> {code:java}
>     private static DataStreamSink<String> addKafkaSink(
>             final DataStream<String> events, final String brokers, final String topic) {
>         return events.sinkTo(KafkaSink.<String>builder()
>                 .setBootstrapServers(brokers)
>                 .setRecordSerializer(
>                         KafkaRecordSerializationSchema.<String>builder()
>                                 .setValueSerializationSchema(new SimpleStringSchema())
>                                 .setTopic(topic)
>                                 .build())
>                 .build());
>     }
>     public static void run(final String[] args) throws Exception {
>         final String brokers = args[0];
>         final String textFilePath = args[1];
>         final String kafkaTopic = args[2];
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         final DataStream<String> text = env.readTextFile(textFilePath);
>         final DataStream<Tuple2<String, Integer>> counts =
>                 text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
>         addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
>         final long nano = System.nanoTime();
>         env.execute("WordCount");
>         FlinkGrayClientMain.reply("success", nano);
>     }
>  {code}
> We found that sometimes the Kafka topic fails to receive a few messages. We reproduce the symptom multiple times. We found that the Kafka topic always gets 160~169 messages while the expected number of messages is 170. We also found that the missing messages are always the expected last few messages from the 170 expected messages.
> Then we inspect the logs and code.
> First, we have an IOException to one of the TaskManagerRunner:
> {code:java}
> 2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark while closing
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
>         at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: java.lang.RuntimeException: McGray injected exception
>         at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         ... 24 more
> Caused by: java.io.IOException {code}
> The IOException is from line 104 in RecordWriter#emit:
> {code:java}
>     protected void emit(T record, int targetSubpartition) throws IOException {
>         checkErroneous();
>         targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); // line 104
>         if (flushAlways) {
>             targetPartition.flush(targetSubpartition);
>         }
>     } {code}
> Here, `targetPartition.emitRecord` will finally call some file I/O or memory map I/O, triggering the IOException for some reason.
> This exception is caught at `RecordWriterOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>         serializationDelegate.setInstance(mark);
>         try {
>             recordWriter.broadcastEmit(serializationDelegate);
>         } catch (Exception e) {
>             throw new RuntimeException(e.getMessage(), e);
>         }
>     } {code}
> And then caught at `ChainingOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         try {
>             watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>             input.processWatermark(mark);
>         } catch (Exception e) {
>             throw new ExceptionInChainedOperatorException(e);
>         }
>     } {code}
> And finally caught at `ContinuousFileReaderOperator#finish`:
> {code:java}
>     @Override
>     public void finish() throws Exception {
>         LOG.debug("finishing");
>         super.finish();        switch (state) {
>             case IDLE:
>                 switchState(ReaderState.FINISHED);
>                 break;
>             case FINISHED:
>                 LOG.warn("operator is already closed, doing nothing");
>                 return;
>             default:
>                 switchState(ReaderState.FINISHING);
>                 while (!state.isTerminal()) {
>                     executor.yield();
>                 }
>         }
>         try {
>             sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
>         } catch (Exception e) {
>             LOG.warn("unable to emit watermark while closing", e);
>         }
>     } {code}
> Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.
> In Flink (version 1.14.0), the full call stack of all the aforementioned workflow is:
> {code:java}
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
> org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
> org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
> org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
> org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
> org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
> org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
> org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
> org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
> org.apache.flink.runtime.taskmanager.Task#doRun:766
> org.apache.flink.runtime.taskmanager.Task#run:575
> java.lang.Thread#run:748 {code}
> We think the reason for missing a few ending messages in Kafka topic is in `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
> {code:java}
>     @Override
>     public void broadcastEmit(T record) throws IOException {
>         checkErroneous();
>         // Emitting to all channels in a for loop can be better than calling
>         // ResultPartitionWriter#broadcastRecord because the broadcastRecord
>         // method incurs extra overhead.
>         ByteBuffer serializedRecord = serializeRecord(serializer, record);
>         for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
>             serializedRecord.rewind();
>             emit(record, channelIndex);   // line 67
>         }
>         if (flushAlways) {
>             flushAll();
>         }
>     } {code}
> Line 67 tries to emit `Watermark.MAX_WATERMARK` (from `ContinuousFileReaderOperator#finish`) to all channels. When the IOException is thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions and loop fails to continue running line 67 for the remaining channels. We reproduce the symptom multiple times and we found the number of missing messages is exactly equal to the number of affected channels.
> That being said, we suspect the potential IOException at line 67 is not properly handled because the current symptom and logging is not convenient for the user to notice the issue or debug. The user may suddenly get a few ending messages missing. And then the user can only find that there is some IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still don’t know why and how a few ending messages are missing.
> We would like to propose a fix for this issue. A simple solution is catching the IOException at line 67 and then do some logging and maybe retry to emit. We implemented this solution and found the symptom disappears. However, we also found that this `broadcastEmit` method is called at many places. So, this fix will also affect the other callers, and we are not sure whether this behavior is also proper for those callers.
> We are looking for suggestions and feedback. Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)