You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:29 UTC

[flink] 05/07: [FLINK-21104][network] Fix handling of obsolete CheckpointBarriers in UnalignedCheckpoints

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bff7f03e1c1925008326caf0045b62159b250b7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Jan 27 18:57:35 2021 +0100

    [FLINK-21104][network] Fix handling of obsolete CheckpointBarriers in UnalignedCheckpoints
    
    If previous checkpoint is declined, it can happen that task receives both older and newer
    checkpoint barrier on two different channels, before processing any checkpoint cancellation
    message/RPC. If the newer checkpoint barrier happens to be processed before the obsolete one
    incorrect `checkState` in ChannelStatePersister would cause job failure. This checkState
    was assuming that the previous checkpoint would have been aborted/stopped before triggering
    the new one, while in reality, this previous checkpoint has never been triggered on this task
    so it also could not have been stopped.
---
 .../partition/consumer/ChannelStatePersister.java  | 28 +++++--
 .../partition/consumer/LocalInputChannel.java      |  3 +-
 .../consumer/ChannelStatePersisterTest.java        | 95 +++++++++++++++-------
 3 files changed, 89 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 06db030..db19f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -36,7 +38,6 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /** Helper class for persisting channel state via {@link ChannelStateWriter}. */
 @NotThreadSafe
@@ -66,16 +67,27 @@ public final class ChannelStatePersister {
         this.channelInfo = checkNotNull(channelInfo);
     }
 
-    protected void startPersisting(long barrierId, List<Buffer> knownBuffers) {
+    protected void startPersisting(long barrierId, List<Buffer> knownBuffers)
+            throws CheckpointException {
         logEvent("startPersisting", barrierId);
-        if (checkpointStatus != CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier < barrierId) {
+        if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier > barrierId) {
+            throw new CheckpointException(
+                    String.format(
+                            "Barrier for newer checkpoint %d has already been received compared to the requested checkpoint %d",
+                            lastSeenBarrier, barrierId),
+                    CheckpointFailureReason
+                            .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned
+        }
+        if (lastSeenBarrier < barrierId) {
+            // Regardless of the current checkpointStatus, if we are notified about a more recent
+            // checkpoint then we have seen so far, always mark that this more recent barrier is
+            // pending.
+            // BARRIER_RECEIVED status can happen if we have seen an older barrier, that probably
+            // has not yet been processed by the task, but task is now notifying us that checkpoint
+            // has started for even newer checkpoint. We should spill the knownBuffers and mark that
+            // we are waiting for that newer barrier to arrive
             checkpointStatus = CheckpointStatus.BARRIER_PENDING;
             lastSeenBarrier = barrierId;
-        } else if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED) {
-            checkState(
-                    lastSeenBarrier >= barrierId,
-                    "Internal error, #stopPersisting for last checkpoint has not been called for "
-                            + channelInfo);
         }
         if (knownBuffers.size() > 0) {
             channelStateWriter.addInputData(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index fecfff9..dcd01e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -123,7 +124,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
     // Consume
     // ------------------------------------------------------------------------
 
-    public void checkpointStarted(CheckpointBarrier barrier) {
+    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
         channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
index fe1a241..5e1fcfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,8 +29,11 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -36,23 +41,36 @@ import static org.junit.Assert.assertTrue;
 public class ChannelStatePersisterTest {
 
     @Test
-    public void testNewBarrierNotOverwrittenByStopPersisting() throws IOException {
+    public void testNewBarrierNotOverwrittenByStopPersisting() throws Exception {
+        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
         ChannelStatePersister persister =
-                new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
+                new ChannelStatePersister(channelStateWriter, channelInfo);
 
-        persister.checkForBarrier(barrier(1L));
-        persister.startPersisting(1L, Collections.emptyList());
+        long checkpointId = 1L;
+        channelStateWriter.start(
+                checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0));
+
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.startPersisting(checkpointId, Arrays.asList(buildSomeBuffer()));
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
+
+        persister.maybePersist(buildSomeBuffer());
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
 
         // meanwhile, checkpoint coordinator timed out the 1st checkpoint and started the 2nd
         // now task thread is picking up the barrier and aborts the 1st:
-        persister.checkForBarrier(barrier(2L));
-        persister.stopPersisting(1L);
+        persister.checkForBarrier(barrier(checkpointId + 1));
+        persister.maybePersist(buildSomeBuffer());
+        persister.stopPersisting(checkpointId);
+        persister.maybePersist(buildSomeBuffer());
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
 
         assertTrue(persister.hasBarrierReceived());
     }
 
     @Test
-    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws IOException {
+    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws Exception {
         ChannelStatePersister persister =
                 new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
 
@@ -65,38 +83,59 @@ public class ChannelStatePersisterTest {
     }
 
     @Test
-    public void testLateBarrierOnCancelledCheckpoint() throws IOException {
-        ChannelStatePersister persister =
-                new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
+    public void testLateBarrierOnStartedAndCancelledCheckpoint() throws Exception {
+        testLateBarrier(true, true);
+    }
 
-        persister.startPersisting(1L, Collections.emptyList());
-        // checkpoint aborted
-        persister.stopPersisting(1L);
+    @Test
+    public void testLateBarrierOnCancelledCheckpoint() throws Exception {
+        testLateBarrier(false, true);
+    }
 
-        // late barrier
-        persister.checkForBarrier(barrier(1L));
+    @Test
+    public void testLateBarrierOnNotYetCancelledCheckpoint() throws Exception {
+        testLateBarrier(false, false);
+    }
 
-        persister.startPersisting(2L, Collections.emptyList());
-        persister.checkForBarrier(barrier(2L));
+    private void testLateBarrier(
+            boolean startCheckpointOnLateBarrier, boolean cancelCheckpointBeforeLateBarrier)
+            throws Exception {
+        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
+
+        ChannelStatePersister persister =
+                new ChannelStatePersister(channelStateWriter, channelInfo);
+
+        long lateCheckpointId = 1L;
+        long checkpointId = 2L;
+        if (startCheckpointOnLateBarrier) {
+            persister.startPersisting(lateCheckpointId, Collections.emptyList());
+        }
+        if (cancelCheckpointBeforeLateBarrier) {
+            persister.stopPersisting(lateCheckpointId);
+        }
+        persister.checkForBarrier(barrier(lateCheckpointId));
+        channelStateWriter.start(
+                checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0));
+        persister.startPersisting(checkpointId, Arrays.asList(buildSomeBuffer()));
+        persister.maybePersist(buildSomeBuffer());
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.maybePersist(buildSomeBuffer());
 
         assertTrue(persister.hasBarrierReceived());
+        assertEquals(2, channelStateWriter.getAddedInput().get(channelInfo).size());
     }
 
-    @Test
-    public void testLateBarrierOnCancelledCheckpointAfterRecover() throws IOException {
+    @Test(expected = CheckpointException.class)
+    public void testLateBarrierTriggeringCheckpoint() throws Exception {
         ChannelStatePersister persister =
                 new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
 
-        // checkpoint aborted, stopPersisting called on recovered input channel without persister
-        persister.stopPersisting(1L);
-
-        // late barrier
-        persister.checkForBarrier(barrier(1L));
+        long lateCheckpointId = 1L;
+        long checkpointId = 2L;
 
-        persister.startPersisting(2L, Collections.emptyList());
-        persister.checkForBarrier(barrier(2L));
-
-        assertTrue(persister.hasBarrierReceived());
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.startPersisting(lateCheckpointId, Collections.emptyList());
     }
 
     private static Buffer barrier(long id) throws IOException {