You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:24 UTC

[flink] branch release-1.12 updated (47c96e1 -> ec06587)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 47c96e1  [FLINK-21226][table-common] Reintroduce TableColumn.of for backwards compatibility
     new d4da833  [FLINK-21104][network] Adding task name to logged network buffers
     new b9b6822  [FLINK-21104][task] Ease debugging.
     new 0a96201  [FLINK-21104][tests] Fix UnalignedCheckpointITCase not completing when there is a failure while finishing.
     new 1f29d03  [FLINK-21104][network] Do not enqueue released channels into the input gate.
     new 3bff7f0  [FLINK-21104][network] Fix handling of obsolete CheckpointBarriers in UnalignedCheckpoints
     new f460a57  [FLINK-20654][network] Fix incorrect spilling/persisting logic in RemoteInputChannel
     new ec06587  [FLINK-21104][network] Priority notification after cancellation.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../channel/RecoveredChannelStateHandler.java      |   2 -
 .../io/network/logger/NetworkActionsLogger.java    |  23 +-
 .../network/partition/PipelinedSubpartition.java   |   5 +-
 .../partition/consumer/ChannelStatePersister.java  |  28 ++-
 .../partition/consumer/LocalInputChannel.java      |   9 +-
 .../partition/consumer/RecoveredInputChannel.java  |   6 +
 .../partition/consumer/RemoteInputChannel.java     |  49 +++-
 .../partition/consumer/SingleInputGate.java        |   6 +
 .../consumer/ChannelStatePersisterTest.java        |  95 ++++---
 .../partition/consumer/SingleInputGateTest.java    |  17 ++
 .../partition/consumer/TestInputChannel.java       |   7 +-
 .../runtime/io/CheckpointedInputGate.java          |   4 +-
 .../runtime/tasks/mailbox/TaskMailboxImpl.java     |  15 +-
 .../runtime/io/AlternatingControllerTest.java      |  10 +-
 .../runtime/io/CheckpointedInputGateTest.java      | 277 ++++++++++++++++++++-
 .../checkpointing/UnalignedCheckpointITCase.java   |  13 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |  68 ++++-
 17 files changed, 550 insertions(+), 84 deletions(-)


[flink] 05/07: [FLINK-21104][network] Fix handling of obsolete CheckpointBarriers in UnalignedCheckpoints

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bff7f03e1c1925008326caf0045b62159b250b7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Jan 27 18:57:35 2021 +0100

    [FLINK-21104][network] Fix handling of obsolete CheckpointBarriers in UnalignedCheckpoints
    
    If previous checkpoint is declined, it can happen that task receives both older and newer
    checkpoint barrier on two different channels, before processing any checkpoint cancellation
    message/RPC. If the newer checkpoint barrier happens to be processed before the obsolete one
    incorrect `checkState` in ChannelStatePersister would cause job failure. This checkState
    was assuming that the previous checkpoint would have been aborted/stopped before triggering
    the new one, while in reality, this previous checkpoint has never been triggered on this task
    so it also could not have been stopped.
---
 .../partition/consumer/ChannelStatePersister.java  | 28 +++++--
 .../partition/consumer/LocalInputChannel.java      |  3 +-
 .../consumer/ChannelStatePersisterTest.java        | 95 +++++++++++++++-------
 3 files changed, 89 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 06db030..db19f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -36,7 +38,6 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /** Helper class for persisting channel state via {@link ChannelStateWriter}. */
 @NotThreadSafe
@@ -66,16 +67,27 @@ public final class ChannelStatePersister {
         this.channelInfo = checkNotNull(channelInfo);
     }
 
-    protected void startPersisting(long barrierId, List<Buffer> knownBuffers) {
+    protected void startPersisting(long barrierId, List<Buffer> knownBuffers)
+            throws CheckpointException {
         logEvent("startPersisting", barrierId);
-        if (checkpointStatus != CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier < barrierId) {
+        if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier > barrierId) {
+            throw new CheckpointException(
+                    String.format(
+                            "Barrier for newer checkpoint %d has already been received compared to the requested checkpoint %d",
+                            lastSeenBarrier, barrierId),
+                    CheckpointFailureReason
+                            .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned
+        }
+        if (lastSeenBarrier < barrierId) {
+            // Regardless of the current checkpointStatus, if we are notified about a more recent
+            // checkpoint then we have seen so far, always mark that this more recent barrier is
+            // pending.
+            // BARRIER_RECEIVED status can happen if we have seen an older barrier, that probably
+            // has not yet been processed by the task, but task is now notifying us that checkpoint
+            // has started for even newer checkpoint. We should spill the knownBuffers and mark that
+            // we are waiting for that newer barrier to arrive
             checkpointStatus = CheckpointStatus.BARRIER_PENDING;
             lastSeenBarrier = barrierId;
-        } else if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED) {
-            checkState(
-                    lastSeenBarrier >= barrierId,
-                    "Internal error, #stopPersisting for last checkpoint has not been called for "
-                            + channelInfo);
         }
         if (knownBuffers.size() > 0) {
             channelStateWriter.addInputData(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index fecfff9..dcd01e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -123,7 +124,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
     // Consume
     // ------------------------------------------------------------------------
 
-    public void checkpointStarted(CheckpointBarrier barrier) {
+    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
         channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
index fe1a241..5e1fcfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,8 +29,11 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -36,23 +41,36 @@ import static org.junit.Assert.assertTrue;
 public class ChannelStatePersisterTest {
 
     @Test
-    public void testNewBarrierNotOverwrittenByStopPersisting() throws IOException {
+    public void testNewBarrierNotOverwrittenByStopPersisting() throws Exception {
+        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
         ChannelStatePersister persister =
-                new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
+                new ChannelStatePersister(channelStateWriter, channelInfo);
 
-        persister.checkForBarrier(barrier(1L));
-        persister.startPersisting(1L, Collections.emptyList());
+        long checkpointId = 1L;
+        channelStateWriter.start(
+                checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0));
+
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.startPersisting(checkpointId, Arrays.asList(buildSomeBuffer()));
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
+
+        persister.maybePersist(buildSomeBuffer());
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
 
         // meanwhile, checkpoint coordinator timed out the 1st checkpoint and started the 2nd
         // now task thread is picking up the barrier and aborts the 1st:
-        persister.checkForBarrier(barrier(2L));
-        persister.stopPersisting(1L);
+        persister.checkForBarrier(barrier(checkpointId + 1));
+        persister.maybePersist(buildSomeBuffer());
+        persister.stopPersisting(checkpointId);
+        persister.maybePersist(buildSomeBuffer());
+        assertEquals(1, channelStateWriter.getAddedInput().get(channelInfo).size());
 
         assertTrue(persister.hasBarrierReceived());
     }
 
     @Test
-    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws IOException {
+    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws Exception {
         ChannelStatePersister persister =
                 new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
 
@@ -65,38 +83,59 @@ public class ChannelStatePersisterTest {
     }
 
     @Test
-    public void testLateBarrierOnCancelledCheckpoint() throws IOException {
-        ChannelStatePersister persister =
-                new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
+    public void testLateBarrierOnStartedAndCancelledCheckpoint() throws Exception {
+        testLateBarrier(true, true);
+    }
 
-        persister.startPersisting(1L, Collections.emptyList());
-        // checkpoint aborted
-        persister.stopPersisting(1L);
+    @Test
+    public void testLateBarrierOnCancelledCheckpoint() throws Exception {
+        testLateBarrier(false, true);
+    }
 
-        // late barrier
-        persister.checkForBarrier(barrier(1L));
+    @Test
+    public void testLateBarrierOnNotYetCancelledCheckpoint() throws Exception {
+        testLateBarrier(false, false);
+    }
 
-        persister.startPersisting(2L, Collections.emptyList());
-        persister.checkForBarrier(barrier(2L));
+    private void testLateBarrier(
+            boolean startCheckpointOnLateBarrier, boolean cancelCheckpointBeforeLateBarrier)
+            throws Exception {
+        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
+
+        ChannelStatePersister persister =
+                new ChannelStatePersister(channelStateWriter, channelInfo);
+
+        long lateCheckpointId = 1L;
+        long checkpointId = 2L;
+        if (startCheckpointOnLateBarrier) {
+            persister.startPersisting(lateCheckpointId, Collections.emptyList());
+        }
+        if (cancelCheckpointBeforeLateBarrier) {
+            persister.stopPersisting(lateCheckpointId);
+        }
+        persister.checkForBarrier(barrier(lateCheckpointId));
+        channelStateWriter.start(
+                checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0));
+        persister.startPersisting(checkpointId, Arrays.asList(buildSomeBuffer()));
+        persister.maybePersist(buildSomeBuffer());
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.maybePersist(buildSomeBuffer());
 
         assertTrue(persister.hasBarrierReceived());
+        assertEquals(2, channelStateWriter.getAddedInput().get(channelInfo).size());
     }
 
-    @Test
-    public void testLateBarrierOnCancelledCheckpointAfterRecover() throws IOException {
+    @Test(expected = CheckpointException.class)
+    public void testLateBarrierTriggeringCheckpoint() throws Exception {
         ChannelStatePersister persister =
                 new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
 
-        // checkpoint aborted, stopPersisting called on recovered input channel without persister
-        persister.stopPersisting(1L);
-
-        // late barrier
-        persister.checkForBarrier(barrier(1L));
+        long lateCheckpointId = 1L;
+        long checkpointId = 2L;
 
-        persister.startPersisting(2L, Collections.emptyList());
-        persister.checkForBarrier(barrier(2L));
-
-        assertTrue(persister.hasBarrierReceived());
+        persister.checkForBarrier(barrier(checkpointId));
+        persister.startPersisting(lateCheckpointId, Collections.emptyList());
     }
 
     private static Buffer barrier(long id) throws IOException {


[flink] 04/07: [FLINK-21104][network] Do not enqueue released channels into the input gate.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f29d03cb3a47037a19bd01d626a83bb3f656a7a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Jan 26 15:55:40 2021 +0100

    [FLINK-21104][network] Do not enqueue released channels into the input gate.
---
 .../io/network/partition/consumer/SingleInputGate.java  |  6 ++++++
 .../network/partition/consumer/SingleInputGateTest.java | 17 +++++++++++++++++
 .../io/network/partition/consumer/TestInputChannel.java |  7 ++++++-
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c49ff1c..12070a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -853,6 +853,12 @@ public class SingleInputGate extends IndexedInputGate {
                     return;
                 }
 
+                if (channel.isReleased()) {
+                    // when channel is closed, EndOfPartitionEvent is send and a final notification
+                    // if EndOfPartitionEvent causes a release, we must ignore the notification
+                    return;
+                }
+
                 if (!queueChannelUnsafe(channel, priority)) {
                     return;
                 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 924211c..1e136ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
@@ -283,6 +284,22 @@ public class SingleInputGateTest extends InputGateTestBase {
     }
 
     @Test
+    public void testNotifyAfterEndOfPartition() throws Exception {
+        final SingleInputGate inputGate = createInputGate(2);
+        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
+        inputGate.setInputChannels(inputChannel, new TestInputChannel(inputGate, 1));
+
+        inputChannel.readEndOfPartitionEvent();
+        inputChannel.notifyChannelNonEmpty();
+        assertEquals(EndOfPartitionEvent.INSTANCE, inputGate.pollNext().get().getEvent());
+
+        // gate is still active because of secondary channel
+        // test if released channel is enqueued
+        inputChannel.notifyChannelNonEmpty();
+        assertFalse(inputGate.pollNext().isPresent());
+    }
+
+    @Test
     public void testIsAvailable() throws Exception {
         final SingleInputGate inputGate = createInputGate(1);
         TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index a17ac7b..5eafb27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
 /** A mocked input channel. */
@@ -149,6 +150,8 @@ public class TestInputChannel extends InputChannel {
 
     @Override
     Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
+        checkState(!isReleased);
+
         BufferAndAvailabilityProvider provider = buffers.poll();
 
         if (provider != null) {
@@ -178,7 +181,9 @@ public class TestInputChannel extends InputChannel {
     }
 
     @Override
-    void releaseAllResources() throws IOException {}
+    void releaseAllResources() throws IOException {
+        isReleased = true;
+    }
 
     @Override
     public void resumeConsumption() {


[flink] 07/07: [FLINK-21104][network] Priority notification after cancellation.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec065872defed72612aced84c2c330bd3d794f6b
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Jan 26 15:57:13 2021 +0100

    [FLINK-21104][network] Priority notification after cancellation.
    
    During cancellation it may happen that CheckpointedInputGate may not poll a priority event if the corresponding channel has already been released. Until race conditions are removed, it safest to simply ignore an empty poll.
---
 .../runtime/io/CheckpointedInputGate.java          |  4 +-
 .../runtime/tasks/mailbox/TaskMailboxImpl.java     | 11 +++
 .../runtime/io/CheckpointedInputGateTest.java      | 87 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index da2d7f8..f638c3e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -108,7 +108,9 @@ public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEven
         while (hasPriorityEvent) {
             // process as many priority events as possible
             final Optional<BufferOrEvent> bufferOrEventOpt = pollNext();
-            checkState(bufferOrEventOpt.isPresent());
+            if (!bufferOrEventOpt.isPresent()) {
+                break;
+            }
             final BufferOrEvent bufferOrEvent = bufferOrEventOpt.get();
             checkState(bufferOrEvent.hasPriority(), "Should only poll priority events");
             hasPriorityEvent = bufferOrEvent.morePriorityEvents();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
index 1389ea3..5f98501 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
@@ -96,6 +96,17 @@ public class TaskMailboxImpl implements TaskMailbox {
         return !batch.isEmpty() || hasNewMail;
     }
 
+    @VisibleForTesting
+    public int size() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return batch.size() + queue.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     @Override
     public Optional<Mail> tryTake(int priority) {
         checkIsMailboxThread();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
index f179a8b..abbf0dc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
@@ -17,21 +17,27 @@
 
 package org.apache.flink.streaming.runtime.io;
 
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
@@ -41,12 +47,16 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
 import static org.junit.Assert.assertEquals;
@@ -176,6 +186,78 @@ public class CheckpointedInputGateTest {
         }
     }
 
+    /**
+     * Tests a priority notification happening right before cancellation. The mail would be
+     * processed while draining mailbox but can't pull any data anymore.
+     */
+    @Test
+    public void testPriorityBeforeClose() throws IOException, InterruptedException {
+
+        NetworkBufferPool bufferPool = new NetworkBufferPool(10, 1024);
+        try (Closer closer = Closer.create()) {
+            closer.register(bufferPool::destroy);
+
+            for (int repeat = 0; repeat < 100; repeat++) {
+                setUp();
+
+                SingleInputGate singleInputGate =
+                        new SingleInputGateBuilder()
+                                .setNumberOfChannels(2)
+                                .setBufferPoolFactory(
+                                        bufferPool.createBufferPool(2, Integer.MAX_VALUE))
+                                .setSegmentProvider(bufferPool)
+                                .setChannelFactory(InputChannelBuilder::buildRemoteChannel)
+                                .build();
+                singleInputGate.setup();
+                ((RemoteInputChannel) singleInputGate.getChannel(0)).requestSubpartition(0);
+
+                final TaskMailboxImpl mailbox = new TaskMailboxImpl();
+                MailboxExecutorImpl mailboxExecutor =
+                        new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
+
+                ValidatingCheckpointHandler validatingHandler = new ValidatingCheckpointHandler(1);
+                SingleCheckpointBarrierHandler barrierHandler =
+                        AlternatingControllerTest.barrierHandler(
+                                singleInputGate, validatingHandler, new MockChannelStateWriter());
+                CheckpointedInputGate checkpointedInputGate =
+                        new CheckpointedInputGate(
+                                singleInputGate,
+                                barrierHandler,
+                                mailboxExecutor,
+                                UpstreamRecoveryTracker.forInputGate(singleInputGate));
+
+                final int oldSize = mailbox.size();
+                enqueue(checkpointedInputGate, 0, barrier(1));
+                // wait for priority mail to be enqueued
+                Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+                while (deadline.hasTimeLeft() && oldSize >= mailbox.size()) {
+                    Thread.sleep(1);
+                }
+
+                // test the race condition
+                // either priority event could be handled, then we expect a checkpoint to be
+                // triggered or closing came first in which case we expect a CancelTaskException
+                CountDownLatch beforeLatch = new CountDownLatch(2);
+                final CheckedThread canceler =
+                        new CheckedThread("Canceler") {
+                            @Override
+                            public void go() throws IOException {
+                                beforeLatch.countDown();
+                                singleInputGate.close();
+                            }
+                        };
+                canceler.start();
+                beforeLatch.countDown();
+                try {
+                    while (mailboxExecutor.tryYield()) {}
+                    assertEquals(1L, validatingHandler.triggeredCheckpointCounter);
+                } catch (CancelTaskException e) {
+                }
+                canceler.join();
+            }
+        }
+    }
+
     private static CheckpointBarrier barrier(long barrierId) {
         return new CheckpointBarrier(
                 barrierId,
@@ -195,6 +277,11 @@ public class CheckpointedInputGateTest {
         enqueue(checkpointedInputGate, channelIndex, EndOfChannelStateEvent.INSTANCE);
     }
 
+    private void enqueueEndOfPartition(
+            CheckpointedInputGate checkpointedInputGate, int channelIndex) throws IOException {
+        enqueue(checkpointedInputGate, channelIndex, EndOfPartitionEvent.INSTANCE);
+    }
+
     private void enqueue(
             CheckpointedInputGate checkpointedInputGate, int channelIndex, AbstractEvent event)
             throws IOException {


[flink] 06/07: [FLINK-20654][network] Fix incorrect spilling/persisting logic in RemoteInputChannel

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f460a57c5e38715b911c4a6cf5803f703f6b8571
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jan 29 13:45:17 2021 +0100

    [FLINK-20654][network] Fix incorrect spilling/persisting logic in RemoteInputChannel
    
    This commit fixes a bug where RemoteInputChannel was incorrectly deciding which
    buffers should be spilled, if it has received an obsoleted CheckpointBarrier,
    that hasn't been cancelled (yet?).
---
 .../partition/consumer/RemoteInputChannel.java     |  38 +++--
 .../runtime/io/AlternatingControllerTest.java      |  10 +-
 .../runtime/io/CheckpointedInputGateTest.java      | 190 ++++++++++++++++++++-
 3 files changed, 218 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ac437ef..edd39ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -541,6 +541,22 @@ public class RemoteInputChannel extends InputChannel {
      */
     public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
         synchronized (receivedBuffers) {
+            if (barrier.getId() < lastBarrierId) {
+                throw new CheckpointException(
+                        String.format(
+                                "Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)",
+                                barrier.getId(), lastBarrierId),
+                        CheckpointFailureReason
+                                .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned
+                // checkpoint is possible
+            } else if (barrier.getId() > lastBarrierId) {
+                // This channel has received some obsolete barrier, older compared to the
+                // checkpointId
+                // which we are processing right now, and we should ignore that obsoleted checkpoint
+                // barrier sequence number.
+                resetLastBarrier();
+            }
+
             channelStatePersister.startPersisting(
                     barrier.getId(), getInflightBuffersUnsafe(barrier.getId()));
         }
@@ -550,14 +566,13 @@ public class RemoteInputChannel extends InputChannel {
         synchronized (receivedBuffers) {
             channelStatePersister.stopPersisting(checkpointId);
             if (lastBarrierId == checkpointId) {
-                lastBarrierId = NONE;
-                lastBarrierSequenceNumber = NONE;
+                resetLastBarrier();
             }
         }
     }
 
     @VisibleForTesting
-    List<Buffer> getInflightBuffers(long checkpointId) throws CheckpointException {
+    List<Buffer> getInflightBuffers(long checkpointId) {
         synchronized (receivedBuffers) {
             return getInflightBuffersUnsafe(checkpointId);
         }
@@ -567,18 +582,10 @@ public class RemoteInputChannel extends InputChannel {
      * Returns a list of buffers, checking the first n non-priority buffers, and skipping all
      * events.
      */
-    private List<Buffer> getInflightBuffersUnsafe(long checkpointId) throws CheckpointException {
+    private List<Buffer> getInflightBuffersUnsafe(long checkpointId) {
         assert Thread.holdsLock(receivedBuffers);
 
-        if (checkpointId < lastBarrierId) {
-            throw new CheckpointException(
-                    String.format(
-                            "Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)",
-                            checkpointId, lastBarrierId),
-                    CheckpointFailureReason
-                            .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned
-            // checkpoint is possible
-        }
+        checkState(checkpointId == lastBarrierId || lastBarrierId == NONE);
 
         final List<Buffer> inflightBuffers = new ArrayList<>();
         Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
@@ -599,6 +606,11 @@ public class RemoteInputChannel extends InputChannel {
         return inflightBuffers;
     }
 
+    private void resetLastBarrier() {
+        lastBarrierId = NONE;
+        lastBarrierSequenceNumber = NONE;
+    }
+
     /**
      * @return if given {@param sequenceNumber} should be spilled given {@link
      *     #lastBarrierSequenceNumber}. We might not have yet received {@link CheckpointBarrier} and
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 7b8faf7..671f6ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -377,8 +378,13 @@ public class AlternatingControllerTest {
         while (gate.pollNext().isPresent()) {}
     }
 
-    private static SingleCheckpointBarrierHandler barrierHandler(
+    public static SingleCheckpointBarrierHandler barrierHandler(
             SingleInputGate inputGate, AbstractInvokable target) {
+        return barrierHandler(inputGate, target, ChannelStateWriter.NO_OP);
+    }
+
+    public static SingleCheckpointBarrierHandler barrierHandler(
+            SingleInputGate inputGate, AbstractInvokable target, ChannelStateWriter stateWriter) {
         String taskName = "test";
         return new SingleCheckpointBarrierHandler(
                 taskName,
@@ -387,7 +393,7 @@ public class AlternatingControllerTest {
                 new AlternatingController(
                         new AlignedController(inputGate),
                         new UnalignedController(
-                                TestSubtaskCheckpointCoordinator.INSTANCE, inputGate)));
+                                new TestSubtaskCheckpointCoordinator(stateWriter), inputGate)));
     }
 
     private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
index ca1b8bf..f179a8b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.java
@@ -17,11 +17,18 @@
 
 package org.apache.flink.streaming.runtime.io;
 
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
@@ -34,17 +41,27 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /** {@link CheckpointedInputGate} test. */
 public class CheckpointedInputGateTest {
+    private final HashMap<Integer, Integer> channelIndexToSequenceNumber = new HashMap<>();
+
+    @Before
+    public void setUp() {
+        channelIndexToSequenceNumber.clear();
+    }
+
     @Test
     public void testUpstreamResumedUponEndOfRecovery() throws Exception {
         int numberOfChannels = 11;
@@ -55,11 +72,11 @@ public class CheckpointedInputGateTest {
                     setupInputGate(numberOfChannels, bufferPool, resumeCounter);
             assertFalse(gate.pollNext().isPresent());
             for (int channelIndex = 0; channelIndex < numberOfChannels - 1; channelIndex++) {
-                emitEndOfState(gate, channelIndex);
+                enqueueEndOfState(gate, channelIndex);
                 assertFalse("should align (block all channels)", gate.pollNext().isPresent());
             }
 
-            emitEndOfState(gate, numberOfChannels - 1);
+            enqueueEndOfState(gate, numberOfChannels - 1);
             Optional<BufferOrEvent> polled = gate.pollNext();
             assertTrue(polled.isPresent());
             assertTrue(polled.get().isEvent());
@@ -73,17 +90,139 @@ public class CheckpointedInputGateTest {
         }
     }
 
-    private void emitEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex)
+    @Test
+    public void testPersisting() throws Exception {
+        testPersisting(false);
+    }
+
+    @Test
+    public void testPersistingWithDrainingTheGate() throws Exception {
+        testPersisting(true);
+    }
+
+    /**
+     * This tests a scenario where an older triggered checkpoint, was cancelled and a newer
+     * checkpoint was triggered very quickly after the cancellation. It can happen that a task can
+     * receive first the more recent checkpoint barrier and later the obsoleted one. This can happen
+     * for many reasons (for example Source tasks not running, or just a race condition with
+     * notifyCheckpointAborted RPCs) and Task should be able to handle this properly. In FLINK-21104
+     * the problem was that this obsoleted checkpoint barrier was causing a checkState to fail.
+     */
+    public void testPersisting(boolean drainGate) throws Exception {
+
+        int numberOfChannels = 3;
+        NetworkBufferPool bufferPool = new NetworkBufferPool(numberOfChannels * 3, 1024);
+        try {
+            long checkpointId = 2L;
+            long obsoleteCheckpointId = 1L;
+            ValidatingCheckpointHandler validatingHandler =
+                    new ValidatingCheckpointHandler(checkpointId);
+            RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
+            CheckpointedInputGate gate =
+                    setupInputGateWithAlternatingController(
+                            numberOfChannels, bufferPool, validatingHandler, stateWriter);
+
+            // enqueue first checkpointId before obsoleteCheckpointId, so that we never trigger
+            // and also never cancel the obsoleteCheckpointId
+            enqueue(gate, 0, buildSomeBuffer());
+            enqueue(gate, 0, barrier(checkpointId));
+            enqueue(gate, 0, buildSomeBuffer());
+            enqueue(gate, 1, buildSomeBuffer());
+            enqueue(gate, 1, barrier(obsoleteCheckpointId));
+            enqueue(gate, 1, buildSomeBuffer());
+            enqueue(gate, 2, buildSomeBuffer());
+
+            assertEquals(0, validatingHandler.getTriggeredCheckpointCounter());
+            // trigger checkpoint
+            gate.pollNext();
+            assertEquals(1, validatingHandler.getTriggeredCheckpointCounter());
+
+            assertAddedInputSize(stateWriter, 0, 1);
+            assertAddedInputSize(stateWriter, 1, 2);
+            assertAddedInputSize(stateWriter, 2, 1);
+
+            enqueue(gate, 0, buildSomeBuffer());
+            enqueue(gate, 1, buildSomeBuffer());
+            enqueue(gate, 2, buildSomeBuffer());
+
+            while (drainGate && gate.pollNext().isPresent()) {}
+
+            assertAddedInputSize(stateWriter, 0, 1);
+            assertAddedInputSize(stateWriter, 1, 3);
+            assertAddedInputSize(stateWriter, 2, 2);
+
+            enqueue(gate, 1, barrier(checkpointId));
+            enqueue(gate, 1, buildSomeBuffer());
+            // Another obsoleted barrier that should be ignored
+            enqueue(gate, 2, barrier(obsoleteCheckpointId));
+            enqueue(gate, 2, buildSomeBuffer());
+
+            while (drainGate && gate.pollNext().isPresent()) {}
+
+            assertAddedInputSize(stateWriter, 0, 1);
+            assertAddedInputSize(stateWriter, 1, 3);
+            assertAddedInputSize(stateWriter, 2, 3);
+
+            enqueue(gate, 2, barrier(checkpointId));
+            enqueue(gate, 2, buildSomeBuffer());
+
+            while (drainGate && gate.pollNext().isPresent()) {}
+
+            assertAddedInputSize(stateWriter, 0, 1);
+            assertAddedInputSize(stateWriter, 1, 3);
+            assertAddedInputSize(stateWriter, 2, 3);
+        } finally {
+            bufferPool.destroy();
+        }
+    }
+
+    private static CheckpointBarrier barrier(long barrierId) {
+        return new CheckpointBarrier(
+                barrierId,
+                barrierId,
+                CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0));
+    }
+
+    private void assertAddedInputSize(
+            RecordingChannelStateWriter stateWriter, int channelIndex, int size) {
+        assertEquals(
+                size,
+                stateWriter.getAddedInput().get(new InputChannelInfo(0, channelIndex)).size());
+    }
+
+    private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex)
             throws IOException {
+        enqueue(checkpointedInputGate, channelIndex, EndOfChannelStateEvent.INSTANCE);
+    }
+
+    private void enqueue(
+            CheckpointedInputGate checkpointedInputGate, int channelIndex, AbstractEvent event)
+            throws IOException {
+        boolean hasPriority = false;
+        if (event instanceof CheckpointBarrier) {
+            hasPriority =
+                    ((CheckpointBarrier) event).getCheckpointOptions().isUnalignedCheckpoint();
+        }
+        enqueue(checkpointedInputGate, channelIndex, EventSerializer.toBuffer(event, hasPriority));
+    }
+
+    private void enqueue(
+            CheckpointedInputGate checkpointedInputGate, int channelIndex, Buffer buffer)
+            throws IOException {
+        Integer sequenceNumber =
+                channelIndexToSequenceNumber.compute(
+                        channelIndex,
+                        (key, oldSequence) -> oldSequence == null ? 0 : oldSequence + 1);
         ((RemoteInputChannel) checkpointedInputGate.getChannel(channelIndex))
-                .onBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false), 0, 0);
+                .onBuffer(buffer, sequenceNumber, 0);
     }
 
     private CheckpointedInputGate setupInputGate(
             int numberOfChannels,
             NetworkBufferPool networkBufferPool,
-            ResumeCountingConnectionManager connectionManager)
+            ConnectionManager connectionManager)
             throws Exception {
+
         SingleInputGate singleInputGate =
                 new SingleInputGateBuilder()
                         .setBufferPoolFactory(
@@ -97,6 +236,10 @@ public class CheckpointedInputGateTest {
                         .setNumberOfChannels(numberOfChannels)
                         .build();
         singleInputGate.setup();
+        MailboxExecutorImpl mailboxExecutor =
+                new MailboxExecutorImpl(
+                        new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE);
+
         CheckpointBarrierTracker barrierHandler =
                 new CheckpointBarrierTracker(
                         numberOfChannels,
@@ -104,10 +247,47 @@ public class CheckpointedInputGateTest {
                             @Override
                             public void invoke() {}
                         });
+
+        CheckpointedInputGate checkpointedInputGate =
+                new CheckpointedInputGate(
+                        singleInputGate,
+                        barrierHandler,
+                        mailboxExecutor,
+                        UpstreamRecoveryTracker.forInputGate(singleInputGate));
+        for (int i = 0; i < numberOfChannels; i++) {
+            ((RemoteInputChannel) checkpointedInputGate.getChannel(i)).requestSubpartition(0);
+        }
+        return checkpointedInputGate;
+    }
+
+    private CheckpointedInputGate setupInputGateWithAlternatingController(
+            int numberOfChannels,
+            NetworkBufferPool networkBufferPool,
+            AbstractInvokable abstractInvokable,
+            RecordingChannelStateWriter stateWriter)
+            throws Exception {
+        ConnectionManager connectionManager = new TestingConnectionManager();
+        SingleInputGate singleInputGate =
+                new SingleInputGateBuilder()
+                        .setBufferPoolFactory(
+                                networkBufferPool.createBufferPool(
+                                        numberOfChannels, Integer.MAX_VALUE))
+                        .setSegmentProvider(networkBufferPool)
+                        .setChannelFactory(
+                                (builder, gate) ->
+                                        builder.setConnectionManager(connectionManager)
+                                                .buildRemoteChannel(gate))
+                        .setNumberOfChannels(numberOfChannels)
+                        .setChannelStateWriter(stateWriter)
+                        .build();
+        singleInputGate.setup();
         MailboxExecutorImpl mailboxExecutor =
                 new MailboxExecutorImpl(
                         new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE);
 
+        SingleCheckpointBarrierHandler barrierHandler =
+                AlternatingControllerTest.barrierHandler(
+                        singleInputGate, abstractInvokable, stateWriter);
         CheckpointedInputGate checkpointedInputGate =
                 new CheckpointedInputGate(
                         singleInputGate,


[flink] 01/07: [FLINK-21104][network] Adding task name to logged network buffers

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4da833ab133ac1248bcf31acd5a5f8e5a6a3897
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Jan 28 17:10:24 2021 +0100

    [FLINK-21104][network] Adding task name to logged network buffers
---
 .../channel/RecoveredChannelStateHandler.java      |  2 --
 .../io/network/logger/NetworkActionsLogger.java    | 23 +++++++++++++++++-----
 .../network/partition/PipelinedSubpartition.java   |  5 ++++-
 .../partition/consumer/LocalInputChannel.java      |  6 ++++--
 .../partition/consumer/RecoveredInputChannel.java  |  6 ++++++
 .../partition/consumer/RemoteInputChannel.java     | 11 +++++++++++
 6 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
index eb72ac7..a307621 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
@@ -67,8 +67,6 @@ class InputChannelRecoveredStateHandler
     @Override
     public void recover(InputChannelInfo channelInfo, Buffer buffer) {
         if (buffer.readableBytes() > 0) {
-            NetworkActionsLogger.traceRecover(
-                    "InputChannelRecoveredStateHandler#recover", buffer, channelInfo);
             getChannel(channelInfo).onRecoveredStateBuffer(buffer);
         } else {
             buffer.recycleBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
index 4ecf18c..531a245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
@@ -40,12 +40,14 @@ public class NetworkActionsLogger {
     public static void traceInput(
             String action,
             Buffer buffer,
+            String taskName,
             InputChannelInfo channelInfo,
             ChannelStatePersister channelStatePersister,
             int sequenceNumber) {
         if (ENABLED) {
             LOG.trace(
-                    "{} {}, seq {}, {} @ {}",
+                    "[{}] {} {}, seq {}, {} @ {}",
+                    taskName,
                     action,
                     buffer.toDebugString(INCLUDE_HASH),
                     sequenceNumber,
@@ -55,15 +57,26 @@ public class NetworkActionsLogger {
     }
 
     public static void traceOutput(
-            String action, Buffer buffer, ResultSubpartitionInfo channelInfo) {
+            String action, Buffer buffer, String taskName, ResultSubpartitionInfo channelInfo) {
         if (ENABLED) {
-            LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+            LOG.trace(
+                    "[{}] {} {} @ {}",
+                    taskName,
+                    action,
+                    buffer.toDebugString(INCLUDE_HASH),
+                    channelInfo);
         }
     }
 
-    public static void traceRecover(String action, Buffer buffer, InputChannelInfo channelInfo) {
+    public static void traceRecover(
+            String action, Buffer buffer, String taskName, InputChannelInfo channelInfo) {
         if (ENABLED) {
-            LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+            LOG.trace(
+                    "[{}] {} {} @ {}",
+                    taskName,
+                    action,
+                    buffer.toDebugString(INCLUDE_HASH),
+                    channelInfo);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 422c832..01fbdd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -325,7 +325,10 @@ public class PipelinedSubpartition extends ResultSubpartition
             // queue
             // will be 2 or more.
             NetworkActionsLogger.traceOutput(
-                    "PipelinedSubpartition#pollBuffer", buffer, subpartitionInfo);
+                    "PipelinedSubpartition#pollBuffer",
+                    buffer,
+                    parent.getOwningTaskName(),
+                    subpartitionInfo);
             return new BufferAndBacklog(
                     buffer,
                     getBuffersInBacklog(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 21f415b..fecfff9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -143,10 +143,11 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
             if (subpartitionView == null) {
                 LOG.debug(
-                        "{}: Requesting LOCAL subpartition {} of partition {}.",
+                        "{}: Requesting LOCAL subpartition {} of partition {}. {}",
                         this,
                         subpartitionIndex,
-                        partitionId);
+                        partitionId,
+                        channelStatePersister);
 
                 try {
                     ResultSubpartitionView subpartitionView =
@@ -262,6 +263,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
         NetworkActionsLogger.traceInput(
                 "LocalInputChannel#getNextBuffer",
                 buffer,
+                inputGate.getOwningTaskName(),
                 channelInfo,
                 channelStatePersister,
                 next.getSequenceNumber());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index a70e74d..d163221 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.util.Preconditions;
@@ -120,6 +121,11 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan
 
     public void onRecoveredStateBuffer(Buffer buffer) {
         boolean recycleBuffer = true;
+        NetworkActionsLogger.traceRecover(
+                "InputChannelRecoveredStateHandler#recover",
+                buffer,
+                inputGate.getOwningTaskName(),
+                channelInfo);
         try {
             final boolean wasEmpty;
             synchronized (receivedBuffers) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 3090f47..ac437ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -42,6 +42,9 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -60,6 +63,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /** An input channel, which requests a remote partition queue. */
 public class RemoteInputChannel extends InputChannel {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
 
     private static final int NONE = -1;
 
@@ -163,6 +167,12 @@ public class RemoteInputChannel extends InputChannel {
     public void requestSubpartition(int subpartitionIndex)
             throws IOException, InterruptedException {
         if (partitionRequestClient == null) {
+            LOG.debug(
+                    "{}: Requesting REMOTE subpartition {} of partition {}. {}",
+                    this,
+                    subpartitionIndex,
+                    partitionId,
+                    channelStatePersister);
             // Create a client and request the partition
             try {
                 partitionRequestClient =
@@ -448,6 +458,7 @@ public class RemoteInputChannel extends InputChannel {
                 NetworkActionsLogger.traceInput(
                         "RemoteInputChannel#onBuffer",
                         buffer,
+                        inputGate.getOwningTaskName(),
                         channelInfo,
                         channelStatePersister,
                         sequenceNumber);


[flink] 02/07: [FLINK-21104][task] Ease debugging.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9b682227b67b15199ab21a466735d44b142d54d
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Jan 26 15:56:11 2021 +0100

    [FLINK-21104][task] Ease debugging.
---
 .../apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
index 118af3f..1389ea3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
@@ -32,6 +32,7 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -133,7 +134,8 @@ public class TaskMailboxImpl implements TaskMailbox {
         try {
             Mail headMail;
             while ((headMail = takeOrNull(queue, priority)) == null) {
-                notEmpty.await();
+                // to ease debugging
+                notEmpty.await(1, TimeUnit.SECONDS);
             }
             hasNewMail = !queue.isEmpty();
             return headMail;


[flink] 03/07: [FLINK-21104][tests] Fix UnalignedCheckpointITCase not completing when there is a failure while finishing.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a96201d8af758c6fcefacdd3af3823dae2e1c4e
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jan 25 20:33:24 2021 +0100

    [FLINK-21104][tests] Fix UnalignedCheckpointITCase not completing when there is a failure while finishing.
    
    Also improved logging and check for data corruption.
---
 .../checkpointing/UnalignedCheckpointITCase.java   | 13 ++++-
 .../checkpointing/UnalignedCheckpointTestBase.java | 68 ++++++++++++++++++----
 2 files changed, 67 insertions(+), 14 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index bad9208..32e17fc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -196,11 +196,11 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                                 "source")
                         .slotSharingGroup(slotSharing ? "default" : "source")
                         .disableChaining()
-                        .map(i -> i)
+                        .map(i -> checkHeader(i))
                         .name("forward")
                         .uid("forward")
                         .slotSharingGroup(slotSharing ? "default" : "forward")
-                        .keyBy(i -> i % parallelism * parallelism)
+                        .keyBy(i -> withoutHeader(i) % parallelism * parallelism)
                         .process(new KeyedIdentityFunction())
                         .name("keyed")
                         .uid("keyed");
@@ -332,12 +332,21 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
         public void initializeState(FunctionInitializationContext context) throws Exception {
             super.initializeState(context);
             backpressure = false;
+            LOG.info(
+                    "Inducing backpressure=false @ {} subtask ({} attempt)",
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    getRuntimeContext().getAttemptNumber());
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
             super.snapshotState(context);
             backpressure = state.completedCheckpoints < minCheckpoints;
+            LOG.info(
+                    "Inducing backpressure={} @ {} subtask ({} attempt)",
+                    backpressure,
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    getRuntimeContext().getAttemptNumber());
         }
 
         @Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 068f3bf..523a5c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -201,8 +201,15 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             private final LongCounter numInputsCounter = new LongCounter();
             private LongSplit split;
             private int numAbortedCheckpoints;
-            private boolean throttle = true;
             private int numRestarts;
+            private int numCheckpointsInThisAttempt;
+            private PollingState pollingState = PollingState.THROTTLING;
+
+            enum PollingState {
+                THROTTLING,
+                PUMPING,
+                FINISHING;
+            }
 
             public LongSourceReader(final long minCheckpoints, int expectedRestarts) {
                 this.minCheckpoints = minCheckpoints;
@@ -221,15 +228,19 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                 output.collect(withHeader(split.nextNumber), split.nextNumber);
                 split.nextNumber += split.increment;
 
-                if (throttle) {
-                    // throttle source as long as sink is not backpressuring (which it does only
-                    // after full recovery)
-                    Thread.sleep(1);
+                switch (pollingState) {
+                    case FINISHING:
+                        return InputStatus.END_OF_INPUT;
+                    case THROTTLING:
+                        // throttle source as long as sink is not backpressuring (which it does only
+                        // after full recovery)
+                        Thread.sleep(1);
+                        return InputStatus.MORE_AVAILABLE;
+                    case PUMPING:
+                        return InputStatus.MORE_AVAILABLE;
+                    default:
+                        throw new IllegalStateException("Unexpected state: " + pollingState);
                 }
-                return split.numCompletedCheckpoints >= minCheckpoints
-                                && numRestarts >= expectedRestarts
-                        ? InputStatus.END_OF_INPUT
-                        : InputStatus.MORE_AVAILABLE;
             }
 
             @Override
@@ -253,9 +264,15 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                             split.numCompletedCheckpoints,
                             split.nextNumber % split.increment,
                             numRestarts);
+                    // Update polling state before final checkpoint such that if there is an issue
+                    // during finishing, after recovery the source immediately starts finishing
+                    // again. In this way, we avoid a deadlock where some tasks need another
+                    // checkpoint completed, while some tasks are finishing (and thus there are no
+                    // new checkpoint).
+                    updatePollingState();
                     split.numCompletedCheckpoints++;
+                    numCheckpointsInThisAttempt++;
                     numAbortedCheckpoints = 0;
-                    throttle = split.numCompletedCheckpoints >= minCheckpoints;
                 }
             }
 
@@ -267,6 +284,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                     // here simply also advance completed checkpoints to avoid running into a live
                     // lock
                     split.numCompletedCheckpoints++;
+                    updatePollingState();
                 }
             }
 
@@ -282,9 +300,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                             "Tried to add " + splits + " but already got " + split);
                 }
                 split = Iterables.getOnlyElement(splits);
+                updatePollingState();
                 LOG.info(
-                        "Added split {} @ {} subtask ({} attempt)",
+                        "Added split {}, pollingState={} @ {} subtask ({} attempt)",
                         split,
+                        pollingState,
                         split.nextNumber % split.increment,
                         numRestarts);
             }
@@ -292,10 +312,33 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             @Override
             public void notifyNoMoreSplits() {}
 
+            private void updatePollingState() {
+                if (split == null) {
+                    return;
+                }
+                if (split.numCompletedCheckpoints >= minCheckpoints
+                        && numRestarts >= expectedRestarts) {
+                    pollingState = PollingState.FINISHING;
+                } else if (numCheckpointsInThisAttempt == 0) {
+                    // speed up recovery by throttling - use a successful checkpoint as a proxy
+                    // for a finished recovery
+                    pollingState = PollingState.THROTTLING;
+                } else {
+                    // cause backpressure
+                    pollingState = PollingState.PUMPING;
+                }
+            }
+
             @Override
             public void handleSourceEvents(SourceEvent sourceEvent) {
                 if (sourceEvent instanceof RestartEvent) {
                     numRestarts = ((RestartEvent) sourceEvent).numRestarts;
+                    updatePollingState();
+                    LOG.info(
+                            "Set restarts {}, pollingState={} ({} attempt)",
+                            split,
+                            pollingState,
+                            numRestarts);
                 }
             }
 
@@ -845,9 +888,10 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         return value ^ HEADER;
     }
 
-    protected static void checkHeader(long value) {
+    protected static long checkHeader(long value) {
         if ((value & HEADER_MASK) != HEADER) {
             throw new IllegalArgumentException("Stream corrupted");
         }
+        return value;
     }
 }