You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2021/11/03 13:57:39 UTC

[flink-ml] 01/08: [FLINK-24655][iteration] HeadOperator waits for MAX_WATERMARK iterates back before terminating.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit 63a82c24d9aaae07b28d5043058199983da5cee4
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Wed Oct 6 16:19:21 2021 +0800

    [FLINK-24655][iteration] HeadOperator waits for MAX_WATERMARK iterates back before terminating.
    
    This is a basis for the checkpoint since for checkpoints
    with feedback edges, we would need to also include the
    feedback records into snapshot, thus if we want to make
    sure all the checkpoints before the terminated globally
    aligned events get done, we have to wait for one more round.
---
 .../flink/iteration/operator/HeadOperator.java     | 158 ++++++++++++---------
 .../headprocessor/HeadOperatorRecordProcessor.java |  58 ++++++++
 .../operator/headprocessor/HeadOperatorState.java  |  22 +++
 .../RegularHeadOperatorRecordProcessor.java        | 122 ++++++++++++++++
 .../TerminatingHeadOperatorRecordProcessor.java    |  60 ++++++++
 .../flink/iteration/operator/HeadOperatorTest.java |  23 ++-
 6 files changed, 369 insertions(+), 74 deletions(-)

diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
index 0796897..d7a9a54 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.iteration.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.iteration.IterationID;
@@ -27,6 +28,9 @@ import org.apache.flink.iteration.broadcast.BroadcastOutput;
 import org.apache.flink.iteration.broadcast.BroadcastOutputFactory;
 import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
 import org.apache.flink.iteration.operator.event.SubtaskAlignedEvent;
+import org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor;
+import org.apache.flink.iteration.operator.headprocessor.RegularHeadOperatorRecordProcessor;
+import org.apache.flink.iteration.operator.headprocessor.TerminatingHeadOperatorRecordProcessor;
 import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -45,17 +49,18 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OutputTag;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
- * The head operators unions the initialized variable stream and the feedback stream, and
- * synchronize the epoch watermark (round).
+ * The head operator unions the initialized variable stream and the feedback stream, and synchronize
+ * the epoch watermark (round).
  */
 public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
         implements OneInputStreamOperator<IterationRecord<?>, IterationRecord<?>>,
@@ -76,15 +81,15 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
 
     private final MailboxExecutor mailboxExecutor;
 
-    private final Map<Integer, Long> numFeedbackRecordsPerEpoch;
+    private transient BroadcastOutput<?> eventBroadcastOutput;
 
-    private transient String uniqueSenderId;
+    private transient ContextImpl processorContext;
 
-    private transient BroadcastOutput<?> eventBroadcastOutput;
+    // ------------- runtime -------------------
 
-    private transient StreamRecord<IterationRecord<?>> reusable;
+    private HeadOperatorStatus status;
 
-    private transient boolean shouldTerminate;
+    private HeadOperatorRecordProcessor recordProcessor;
 
     public HeadOperator(
             IterationID iterationId,
@@ -98,7 +103,6 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
         this.isCriteriaStream = isCriteriaStream;
         this.mailboxExecutor = Objects.requireNonNull(mailboxExecutor);
         this.operatorEventGateway = Objects.requireNonNull(operatorEventGateway);
-        this.numFeedbackRecordsPerEpoch = new HashMap<>();
 
         // Even though this operator does not use the processing
         // time service, AbstractStreamOperator requires this
@@ -112,9 +116,6 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
             StreamConfig config,
             Output<StreamRecord<IterationRecord<?>>> output) {
         super.setup(containingTask, config, output);
-        uniqueSenderId =
-                OperatorUtils.getUniqueSenderId(
-                        getOperatorID(), getRuntimeContext().getIndexOfThisSubtask());
         eventBroadcastOutput =
                 BroadcastOutputFactory.createBroadcastOutput(
                         output, metrics.getIOMetricGroup().getNumRecordsOutCounter());
@@ -124,12 +125,14 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
 
-        reusable = new StreamRecord<>(null);
+        processorContext = new ContextImpl();
+        status = HeadOperatorStatus.RUNNING;
+        recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
 
         // Here we register a mail
         registerFeedbackConsumer(
                 (Runnable runnable) -> {
-                    if (!shouldTerminate) {
+                    if (status != HeadOperatorStatus.TERMINATED) {
                         mailboxExecutor.execute(runnable::run, "Head feedback");
                     }
                 });
@@ -137,73 +140,39 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
 
     @Override
     public void processElement(StreamRecord<IterationRecord<?>> element) throws Exception {
-        processRecord(element);
+        recordProcessor.processElement(element);
     }
 
     @Override
     public void processFeedback(StreamRecord<IterationRecord<?>> iterationRecord) throws Exception {
-        if (iterationRecord.getValue().getType() == IterationRecord.Type.RECORD) {
-            numFeedbackRecordsPerEpoch.compute(
-                    iterationRecord.getValue().getEpoch(),
-                    (round, count) -> count == null ? 1 : count + 1);
-        }
-        processRecord(iterationRecord);
-    }
-
-    private void processRecord(StreamRecord<IterationRecord<?>> iterationRecord) {
-        switch (iterationRecord.getValue().getType()) {
-            case RECORD:
-                reusable.replace(iterationRecord.getValue(), iterationRecord.getTimestamp());
-                output.collect(reusable);
-                break;
-            case EPOCH_WATERMARK:
-                LOG.debug(
-                        "Head Received epoch watermark {}", iterationRecord.getValue().getEpoch());
-                sendEpochWatermarkToCoordinator(iterationRecord.getValue().getEpoch());
-                break;
+        boolean terminated = recordProcessor.processFeedbackElement(iterationRecord);
+        if (terminated) {
+            checkState(status == HeadOperatorStatus.TERMINATING);
+            status = HeadOperatorStatus.TERMINATED;
         }
     }
 
     @Override
-    @SuppressWarnings({"unchecked", "rawtypes"})
     public void handleOperatorEvent(OperatorEvent operatorEvent) {
         if (operatorEvent instanceof GloballyAlignedEvent) {
-            try {
-                GloballyAlignedEvent globallyAlignedEvent = (GloballyAlignedEvent) operatorEvent;
-                LOG.info("Received global event {}", globallyAlignedEvent);
-
-                shouldTerminate = globallyAlignedEvent.isTerminated();
-                reusable.replace(
-                        IterationRecord.newEpochWatermark(
-                                globallyAlignedEvent.isTerminated()
-                                        ? Integer.MAX_VALUE
-                                        : globallyAlignedEvent.getEpoch(),
-                                uniqueSenderId),
-                        0);
-                eventBroadcastOutput.broadcastEmit((StreamRecord) reusable);
-                numFeedbackRecordsPerEpoch.remove(globallyAlignedEvent.getEpoch());
-            } catch (Exception e) {
-                ExceptionUtils.rethrow(e);
+            boolean shouldTerminate =
+                    recordProcessor.onGloballyAligned((GloballyAlignedEvent) operatorEvent);
+            if (shouldTerminate) {
+                status = HeadOperatorStatus.TERMINATING;
+                recordProcessor = new TerminatingHeadOperatorRecordProcessor();
             }
         }
     }
 
     @Override
     public void endInput() throws Exception {
-        sendEpochWatermarkToCoordinator(0);
-        while (!shouldTerminate) {
+        recordProcessor.processElement(
+                new StreamRecord<>(IterationRecord.newEpochWatermark(0, "fake")));
+        while (status != HeadOperatorStatus.TERMINATED) {
             mailboxExecutor.yield();
         }
     }
 
-    private void sendEpochWatermarkToCoordinator(int round) {
-        operatorEventGateway.sendEventToCoordinator(
-                new SubtaskAlignedEvent(
-                        round,
-                        numFeedbackRecordsPerEpoch.getOrDefault(round, 0L),
-                        isCriteriaStream));
-    }
-
     private void registerFeedbackConsumer(Executor mailboxExecutor) {
         int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
         int attemptNum = getRuntimeContext().getAttemptNumber();
@@ -217,11 +186,6 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
     }
 
     @VisibleForTesting
-    Map<Integer, Long> getNumFeedbackRecordsPerEpoch() {
-        return numFeedbackRecordsPerEpoch;
-    }
-
-    @VisibleForTesting
     public OperatorEventGateway getOperatorEventGateway() {
         return operatorEventGateway;
     }
@@ -230,4 +194,62 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
     MailboxExecutor getMailboxExecutor() {
         return mailboxExecutor;
     }
+
+    @VisibleForTesting
+    HeadOperatorRecordProcessor getRecordProcessor() {
+        return recordProcessor;
+    }
+
+    @VisibleForTesting
+    public HeadOperatorStatus getStatus() {
+        return status;
+    }
+
+    @VisibleForTesting
+    enum HeadOperatorStatus {
+        RUNNING,
+
+        TERMINATING,
+
+        TERMINATED
+    }
+
+    private class ContextImpl implements HeadOperatorRecordProcessor.Context {
+
+        @Override
+        public StreamConfig getStreamConfig() {
+            return HeadOperator.this.config;
+        }
+
+        @Override
+        public TaskInfo getTaskInfo() {
+            return getContainingTask().getEnvironment().getTaskInfo();
+        }
+
+        @Override
+        public void output(StreamRecord<IterationRecord<?>> record) {
+            output.collect(record);
+        }
+
+        @Override
+        public void output(
+                OutputTag<IterationRecord<?>> outputTag, StreamRecord<IterationRecord<?>> record) {
+            output.collect(outputTag, record);
+        }
+
+        @Override
+        public void broadcastOutput(StreamRecord<IterationRecord<?>> record) {
+            try {
+                eventBroadcastOutput.broadcastEmit((StreamRecord) record);
+            } catch (IOException e) {
+                throw new FlinkRuntimeException("Failed to broadcast event", e);
+            }
+        }
+
+        @Override
+        public void updateEpochToCoordinator(int epoch, long numFeedbackRecords) {
+            operatorEventGateway.sendEventToCoordinator(
+                    new SubtaskAlignedEvent(epoch, numFeedbackRecords, isCriteriaStream));
+        }
+    }
 }
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorRecordProcessor.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorRecordProcessor.java
new file mode 100644
index 0000000..78132c3
--- /dev/null
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorRecordProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.headprocessor;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.operator.HeadOperator;
+import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/** The component to actually deal with the event received in the {@link HeadOperator}. */
+public interface HeadOperatorRecordProcessor {
+
+    void initializeState(HeadOperatorState headOperatorState) throws Exception;
+
+    void processElement(StreamRecord<IterationRecord<?>> record);
+
+    boolean processFeedbackElement(StreamRecord<IterationRecord<?>> record);
+
+    boolean onGloballyAligned(GloballyAlignedEvent globallyAlignedEvent);
+
+    HeadOperatorState snapshotState();
+
+    /** The context for {@link HeadOperatorRecordProcessor}. */
+    interface Context {
+
+        StreamConfig getStreamConfig();
+
+        TaskInfo getTaskInfo();
+
+        void output(StreamRecord<IterationRecord<?>> record);
+
+        void output(
+                OutputTag<IterationRecord<?>> outputTag, StreamRecord<IterationRecord<?>> record);
+
+        void broadcastOutput(StreamRecord<IterationRecord<?>> record);
+
+        void updateEpochToCoordinator(int epoch, long numFeedbackRecords);
+    }
+}
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorState.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorState.java
new file mode 100644
index 0000000..861f481
--- /dev/null
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/HeadOperatorState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.headprocessor;
+
+/** The state entry for the head operator. */
+public class HeadOperatorState {}
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/RegularHeadOperatorRecordProcessor.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/RegularHeadOperatorRecordProcessor.java
new file mode 100644
index 0000000..f1a6b0f
--- /dev/null
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/RegularHeadOperatorRecordProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.headprocessor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.operator.OperatorUtils;
+import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Processes the event before we received the terminated global aligned event from the coordinator.
+ */
+public class RegularHeadOperatorRecordProcessor implements HeadOperatorRecordProcessor {
+
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(RegularHeadOperatorRecordProcessor.class);
+
+    private final Context headOperatorContext;
+
+    private final StreamRecord<IterationRecord<?>> reusable;
+
+    private final Map<Integer, Long> numFeedbackRecordsPerEpoch;
+
+    private final String senderId;
+
+    public RegularHeadOperatorRecordProcessor(Context headOperatorContext) {
+        this.headOperatorContext = headOperatorContext;
+
+        this.reusable = new StreamRecord<>(null);
+        this.numFeedbackRecordsPerEpoch = new HashMap<>();
+
+        this.senderId =
+                OperatorUtils.getUniqueSenderId(
+                        headOperatorContext.getStreamConfig().getOperatorID(),
+                        headOperatorContext.getTaskInfo().getIndexOfThisSubtask());
+    }
+
+    @Override
+    public void initializeState(HeadOperatorState headOperatorState) throws Exception {}
+
+    @Override
+    public void processElement(StreamRecord<IterationRecord<?>> element) {
+        processRecord(element);
+    }
+
+    @Override
+    public boolean processFeedbackElement(StreamRecord<IterationRecord<?>> element) {
+        if (element.getValue().getType() == IterationRecord.Type.RECORD) {
+            numFeedbackRecordsPerEpoch.compute(
+                    element.getValue().getEpoch(), (epoch, count) -> count == null ? 1 : count + 1);
+        }
+
+        processRecord(element);
+
+        return false;
+    }
+
+    @Override
+    public boolean onGloballyAligned(GloballyAlignedEvent globallyAlignedEvent) {
+        LOG.info("Received global event {}", globallyAlignedEvent);
+
+        reusable.replace(
+                IterationRecord.newEpochWatermark(
+                        globallyAlignedEvent.isTerminated()
+                                ? Integer.MAX_VALUE
+                                : globallyAlignedEvent.getEpoch(),
+                        senderId),
+                0);
+        headOperatorContext.broadcastOutput(reusable);
+
+        return globallyAlignedEvent.isTerminated();
+    }
+
+    @Override
+    public HeadOperatorState snapshotState() {
+        return new HeadOperatorState();
+    }
+
+    @VisibleForTesting
+    public Map<Integer, Long> getNumFeedbackRecordsPerEpoch() {
+        return numFeedbackRecordsPerEpoch;
+    }
+
+    private void processRecord(StreamRecord<IterationRecord<?>> iterationRecord) {
+        switch (iterationRecord.getValue().getType()) {
+            case RECORD:
+                reusable.replace(iterationRecord.getValue(), iterationRecord.getTimestamp());
+                headOperatorContext.output(reusable);
+                break;
+            case EPOCH_WATERMARK:
+                LOG.info("Head Received epoch watermark {}", iterationRecord.getValue().getEpoch());
+                headOperatorContext.updateEpochToCoordinator(
+                        iterationRecord.getValue().getEpoch(),
+                        numFeedbackRecordsPerEpoch.getOrDefault(
+                                iterationRecord.getValue().getEpoch(), 0L));
+                break;
+        }
+    }
+}
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/TerminatingHeadOperatorRecordProcessor.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/TerminatingHeadOperatorRecordProcessor.java
new file mode 100644
index 0000000..c5377e5
--- /dev/null
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/headprocessor/TerminatingHeadOperatorRecordProcessor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.headprocessor;
+
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * Processor used after we received terminated globally aligned event from the coordinator, but
+ * before we received the (Integer.MAX_VALUE + 1) from the feedback channel again.
+ */
+public class TerminatingHeadOperatorRecordProcessor implements HeadOperatorRecordProcessor {
+
+    @Override
+    public void initializeState(HeadOperatorState headOperatorState) throws Exception {}
+
+    @Override
+    public void processElement(StreamRecord<IterationRecord<?>> record) {
+        throw new FlinkRuntimeException(
+                "It is not possible to receive the element from normal input during terminating.");
+    }
+
+    @Override
+    public boolean processFeedbackElement(StreamRecord<IterationRecord<?>> record) {
+        if (record.getValue().getType() == IterationRecord.Type.EPOCH_WATERMARK) {
+            return record.getValue().getEpoch() == Integer.MAX_VALUE + 1;
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean onGloballyAligned(GloballyAlignedEvent globallyAlignedEvent) {
+        throw new FlinkRuntimeException(
+                "It is not possible to receive the globally aligned event from normal input during terminating.");
+    }
+
+    @Override
+    public HeadOperatorState snapshotState() {
+        return new HeadOperatorState();
+    }
+}
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
index cc3ce31..f54422e 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.iteration.IterationID;
 import org.apache.flink.iteration.IterationRecord;
 import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
 import org.apache.flink.iteration.operator.event.SubtaskAlignedEvent;
+import org.apache.flink.iteration.operator.headprocessor.RegularHeadOperatorRecordProcessor;
 import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -86,12 +87,12 @@ public class HeadOperatorTest extends TestLogger {
                             new StreamRecord<>(IterationRecord.newRecord(2, 0), 3),
                             new StreamRecord<>(IterationRecord.newRecord(4, 1), 4));
             assertEquals(expectedOutput, new ArrayList<>(harness.getOutput()));
-            assertEquals(
-                    2,
-                    (long)
-                            RecordingHeadOperatorFactory.latestHeadOperator
-                                    .getNumFeedbackRecordsPerEpoch()
-                                    .get(1));
+
+            RegularHeadOperatorRecordProcessor recordProcessor =
+                    (RegularHeadOperatorRecordProcessor)
+                            RecordingHeadOperatorFactory.latestHeadOperator.getRecordProcessor();
+
+            assertEquals(2, (long) recordProcessor.getNumFeedbackRecordsPerEpoch().get(1));
         }
     }
 
@@ -153,6 +154,16 @@ public class HeadOperatorTest extends TestLogger {
                                                     new SerializedValue<>(
                                                             new GloballyAlignedEvent(1, true)));
 
+                                    while (RecordingHeadOperatorFactory.latestHeadOperator
+                                                    .getStatus()
+                                            == HeadOperator.HeadOperatorStatus.RUNNING) {}
+                                    putFeedbackRecords(
+                                            iterationId,
+                                            0,
+                                            new StreamRecord<>(
+                                                    IterationRecord.newEpochWatermark(
+                                                            Integer.MAX_VALUE + 1, "tail")));
+
                                     return null;
                                 } catch (Throwable e) {
                                     RecordingHeadOperatorFactory.latestHeadOperator