You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/02/25 18:25:25 UTC

[flink] 02/02: [FLINK-21490][tests] Harden UnalignedCheckpointITCase.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c1b20d2119463d4571d17de607aebfff1b4b17f
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Feb 9 21:15:04 2021 +0100

    [FLINK-21490][tests] Harden UnalignedCheckpointITCase.
    
    The source readers currently inferred the number of checkpoints individually potentially causing a drift. Number of checkpoint and restarts is now calculated in the coordinator and propagated to the readers keeping them in sync. The mechanism to infer the number of restarts has been improved (RuntimeContext is unavailable in source) by actually checking for an creation of readers.
    
    Further, if the payload of the test passes MAX_INT, then checkHeader fails as it uses the upper 4 bytes of the long to check for magic bytes. A checkstate ensures that no overflow happens and aborts the test with a meaningful message. checkHeader further prints the expected and actual value.
    
    This commit includes some refactorings for the upcoming UnalignedCheckpointRescaleITCase which eases the maintenance of the test in 1.12 and master branch.
---
 .../checkpointing/UnalignedCheckpointITCase.java   | 132 ++------
 .../checkpointing/UnalignedCheckpointTestBase.java | 346 +++++++++++++--------
 2 files changed, 253 insertions(+), 225 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 32e17fc..1748675 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
@@ -48,6 +47,7 @@ import java.util.BitSet;
 
 import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.equalTo;
 
 /**
  * Integration test for performing the unaligned checkpoint.
@@ -135,7 +135,8 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                 .setNumSlots(slotSharing ? parallelism : parallelism * numShuffles)
                 .setNumBuffers(getNumBuffers(parallelism, numShuffles))
                 .setSlotsPerTaskManager(slotsPerTaskManager)
-                .setExpectedFailures(5);
+                .setExpectedFailures(5)
+                .setFailuresAfterSourceFinishes(1);
     }
 
     private static UnalignedSettings createCogroupSettings(int parallelism) {
@@ -146,7 +147,8 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                 .setNumSlots(parallelism * numShuffles)
                 .setNumBuffers(getNumBuffers(parallelism, numShuffles))
                 .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5);
+                .setExpectedFailures(5)
+                .setFailuresAfterSourceFinishes(1);
     }
 
     private static UnalignedSettings createUnionSettings(int parallelism) {
@@ -157,19 +159,8 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                 .setNumSlots(parallelism * numShuffles)
                 .setNumBuffers(getNumBuffers(parallelism, numShuffles))
                 .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5);
-    }
-
-    private static int getNumBuffers(int parallelism, int numShuffles) {
-        int buffersPerSubtask =
-                parallelism
-                        + 1
-                        + // output side
-                        2
-                                * BUFFER_PER_CHANNEL
-                                * parallelism; // input side including recovery (=local channels
-        // count fully)
-        return buffersPerSubtask * parallelism * numShuffles;
+                .setExpectedFailures(5)
+                .setFailuresAfterSourceFinishes(1);
     }
 
     private final UnalignedSettings settings;
@@ -183,9 +174,23 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
         execute(settings);
     }
 
+    protected void checkCounters(JobExecutionResult result) {
+        collector.checkThat(
+                "NUM_OUT_OF_ORDER",
+                result.<Long>getAccumulatorResult(NUM_OUT_OF_ORDER),
+                equalTo(0L));
+        collector.checkThat(
+                "NUM_DUPLICATES", result.<Long>getAccumulatorResult(NUM_DUPLICATES), equalTo(0L));
+        collector.checkThat("NUM_LOST", result.<Long>getAccumulatorResult(NUM_LOST), equalTo(0L));
+        collector.checkThat(
+                "NUM_FAILURES",
+                result.<Integer>getAccumulatorResult(NUM_FAILURES),
+                equalTo(settings.expectedFailures));
+    }
+
     private static void createPipeline(
             StreamExecutionEnvironment env,
-            long minCheckpoints,
+            int minCheckpoints,
             boolean slotSharing,
             int expectedRestarts) {
         final int parallelism = env.getParallelism();
@@ -209,12 +214,12 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
 
     private static void createMultipleInputTopology(
             StreamExecutionEnvironment env,
-            long minCheckpoints,
+            int minCheckpoints,
             boolean slotSharing,
             int expectedRestarts) {
         final int parallelism = env.getParallelism();
         DataStream<Long> combinedSource = null;
-        for (int inputIndex = 0; inputIndex < 4; inputIndex++) {
+        for (int inputIndex = 0; inputIndex < NUM_SOURCES; inputIndex++) {
             final SingleOutputStreamOperator<Long> source =
                     env.fromSource(
                                     new LongSource(minCheckpoints, parallelism, expectedRestarts),
@@ -239,13 +244,12 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
 
     private static void createUnionTopology(
             StreamExecutionEnvironment env,
-            long minCheckpoints,
+            int minCheckpoints,
             boolean slotSharing,
             int expectedRestarts) {
         final int parallelism = env.getParallelism();
         DataStream<Long> combinedSource = null;
-        final int numSources = 4;
-        for (int inputIndex = 0; inputIndex < numSources; inputIndex++) {
+        for (int inputIndex = 0; inputIndex < NUM_SOURCES; inputIndex++) {
             final SingleOutputStreamOperator<Long> source =
                     env.fromSource(
                                     new LongSource(minCheckpoints, parallelism, expectedRestarts),
@@ -261,7 +265,7 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                         .partitionCustom(
                                 (key, numPartitions) -> (int) (withoutHeader(key) % numPartitions),
                                 l -> l)
-                        .flatMap(new CountingMapFunction(numSources));
+                        .flatMap(new CountingMapFunction(NUM_SOURCES));
         addFailingPipeline(minCheckpoints, slotSharing, deduplicated);
     }
 
@@ -293,28 +297,11 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                 .slotSharingGroup(slotSharing ? "default" : "sink");
     }
 
-    /** Shifts the partitions one up. */
-    protected static class ShiftingPartitioner implements Partitioner<Long> {
-        @Override
-        public int partition(Long key, int numPartitions) {
-            return (int) ((withoutHeader(key) + 1) % numPartitions);
-        }
-    }
-
-    /** Distributes chunks of the size of numPartitions in a round robin fashion. */
-    protected static class ChunkDistributingPartitioner implements Partitioner<Long> {
-        @Override
-        public int partition(Long key, int numPartitions) {
-            return (int) ((withoutHeader(key) / numPartitions) % numPartitions);
-        }
-    }
-
     /**
      * A sink that checks if the members arrive in the expected order without any missing values.
      */
     protected static class StrictOrderVerifyingSink
             extends VerifyingSinkBase<StrictOrderVerifyingSink.State> {
-        protected boolean backpressure;
         private boolean firstOutOfOrder = true;
         private boolean firstDuplicate = true;
         private boolean firstLostValue = true;
@@ -331,22 +318,6 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
         @Override
         public void initializeState(FunctionInitializationContext context) throws Exception {
             super.initializeState(context);
-            backpressure = false;
-            LOG.info(
-                    "Inducing backpressure=false @ {} subtask ({} attempt)",
-                    getRuntimeContext().getIndexOfThisSubtask(),
-                    getRuntimeContext().getAttemptNumber());
-        }
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            super.snapshotState(context);
-            backpressure = state.completedCheckpoints < minCheckpoints;
-            LOG.info(
-                    "Inducing backpressure={} @ {} subtask ({} attempt)",
-                    backpressure,
-                    getRuntimeContext().getIndexOfThisSubtask(),
-                    getRuntimeContext().getAttemptNumber());
         }
 
         @Override
@@ -413,49 +384,6 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
         }
     }
 
-    private static class MinEmittingFunction extends RichCoFlatMapFunction<Long, Long, Long>
-            implements CheckpointedFunction {
-        private ListState<State> stateList;
-        private State state;
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            stateList.clear();
-            stateList.add(state);
-        }
-
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            stateList =
-                    context.getOperatorStateStore()
-                            .getListState(new ListStateDescriptor<>("state", State.class));
-            this.state = getOnlyElement(stateList.get(), new State());
-        }
-
-        @Override
-        public void flatMap1(Long value, Collector<Long> out) {
-            long baseValue = withoutHeader(value);
-            state.lastLeft = baseValue;
-            if (state.lastRight >= baseValue) {
-                out.collect(value);
-            }
-        }
-
-        @Override
-        public void flatMap2(Long value, Collector<Long> out) {
-            long baseValue = withoutHeader(value);
-            state.lastRight = baseValue;
-            if (state.lastLeft >= baseValue) {
-                out.collect(value);
-            }
-        }
-
-        private static class State {
-            private long lastLeft = Long.MIN_VALUE;
-            private long lastRight = Long.MIN_VALUE;
-        }
-    }
-
     private static class KeyedIdentityFunction extends KeyedProcessFunction<Long, Long, Long> {
         ValueState<Long> state;
 
@@ -485,7 +413,7 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
         private ListState<BitSet> stateList;
 
         public CountingMapFunction(int numSources) {
-            this.withdrawnCount = numSources - 1;
+            withdrawnCount = numSources - 1;
         }
 
         @Override
@@ -514,7 +442,7 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
                             .getListState(
                                     new ListStateDescriptor<>(
                                             "state", new GenericTypeInfo<>(BitSet.class)));
-            this.seenRecords = getOnlyElement(stateList.get(), new BitSet());
+            seenRecords = getOnlyElement(stateList.get(), new BitSet());
         }
     }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 523a5c5..394e674 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -53,7 +54,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
@@ -72,27 +75,29 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static java.util.Collections.singletonList;
 import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
-import static org.hamcrest.Matchers.equalTo;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
 /** Base class for tests related to unaligned checkpoints. */
 public abstract class UnalignedCheckpointTestBase extends TestLogger {
     protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
+    protected static final String NUM_INPUTS = "inputs";
     protected static final String NUM_OUTPUTS = "outputs";
     protected static final String NUM_OUT_OF_ORDER = "outOfOrder";
     protected static final String NUM_FAILURES = "failures";
     protected static final String NUM_DUPLICATES = "duplicates";
     protected static final String NUM_LOST = "lost";
-    public static final int BUFFER_PER_CHANNEL = 1;
+    protected static final int BUFFER_PER_CHANNEL = 1;
+    /** For multi-gate tests. */
+    protected static final int NUM_SOURCES = 2;
 
     private static final long HEADER = 0xABCDEAFCL << 32;
     private static final long HEADER_MASK = 0xFFFFFFFFL << 32;
@@ -107,24 +112,14 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         StreamExecutionEnvironment env = settings.createEnvironment(checkpointDir);
 
         settings.dagCreator.create(
-                env, settings.minCheckpoints, settings.slotSharing, settings.expectedFailures - 1);
+                env,
+                settings.minCheckpoints,
+                settings.slotSharing,
+                settings.expectedFailures - settings.failuresAfterSourceFinishes);
         try {
             final JobExecutionResult result = env.execute();
 
-            collector.checkThat(
-                    "NUM_OUT_OF_ORDER",
-                    result.<Long>getAccumulatorResult(NUM_OUT_OF_ORDER),
-                    equalTo(0L));
-            collector.checkThat(
-                    "NUM_DUPLICATES",
-                    result.<Long>getAccumulatorResult(NUM_DUPLICATES),
-                    equalTo(0L));
-            collector.checkThat(
-                    "NUM_LOST", result.<Long>getAccumulatorResult(NUM_LOST), equalTo(0L));
-            collector.checkThat(
-                    "NUM_FAILURES",
-                    result.<Integer>getAccumulatorResult(NUM_FAILURES),
-                    equalTo(settings.expectedFailures));
+            checkCounters(result);
         } catch (Exception e) {
             if (settings.generateCheckpoint) {
                 return Files.find(
@@ -133,7 +128,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                                 (file, attr) ->
                                         attr.isDirectory()
                                                 && file.getFileName().toString().startsWith("chk"))
-                        .findFirst()
+                        .min(Comparator.comparing(Path::toString))
                         .map(Path::toFile)
                         .orElseThrow(
                                 () -> new IllegalStateException("Cannot generate checkpoint", e));
@@ -146,14 +141,22 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         return null;
     }
 
+    protected abstract void checkCounters(JobExecutionResult result);
+
+    protected static int getNumBuffers(int parallelism, int numShuffles) {
+        // p + 1 buffer on output side + input side including recovery (=local channels count fully)
+        int buffersPerSubtask = parallelism + 1 + 2 * BUFFER_PER_CHANNEL * parallelism;
+        return buffersPerSubtask * parallelism * numShuffles;
+    }
+
     /** A source that generates longs in a fixed number of splits. */
     protected static class LongSource
             implements Source<Long, LongSource.LongSplit, LongSource.EnumeratorState> {
-        private final long minCheckpoints;
+        private final int minCheckpoints;
         private final int numSplits;
         private final int expectedRestarts;
 
-        protected LongSource(long minCheckpoints, int numSplits, int expectedRestarts) {
+        protected LongSource(int minCheckpoints, int numSplits, int expectedRestarts) {
             this.minCheckpoints = minCheckpoints;
             this.numSplits = numSplits;
             this.expectedRestarts = expectedRestarts;
@@ -166,7 +169,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
         @Override
         public SourceReader<Long, LongSplit> createReader(SourceReaderContext readerContext) {
-            return new LongSourceReader(minCheckpoints, expectedRestarts);
+            if (readerContext.getIndexOfSubtask() == 0) {
+                readerContext.sendSourceEventToCoordinator(new RestartEvent());
+            }
+            return new LongSourceReader(
+                    readerContext.getIndexOfSubtask(), minCheckpoints, expectedRestarts);
         }
 
         @Override
@@ -174,9 +181,9 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                 SplitEnumeratorContext<LongSplit> enumContext) {
             List<LongSplit> splits =
                     IntStream.range(0, numSplits)
-                            .mapToObj(i -> new LongSplit(i, numSplits, 0))
+                            .mapToObj(i -> new LongSplit(i, numSplits))
                             .collect(Collectors.toList());
-            return new LongSplitSplitEnumerator(enumContext, new EnumeratorState(splits, 0));
+            return new LongSplitSplitEnumerator(enumContext, new EnumeratorState(splits, -1, 0));
         }
 
         @Override
@@ -196,22 +203,25 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         }
 
         private static class LongSourceReader implements SourceReader<Long, LongSplit> {
+            private final int subtaskIndex;
             private final long minCheckpoints;
             private final int expectedRestarts;
             private final LongCounter numInputsCounter = new LongCounter();
-            private LongSplit split;
+            private List<LongSplit> splits = new ArrayList<>();
             private int numAbortedCheckpoints;
             private int numRestarts;
+            private int numCompletedCheckpoints;
             private int numCheckpointsInThisAttempt;
             private PollingState pollingState = PollingState.THROTTLING;
 
             enum PollingState {
                 THROTTLING,
                 PUMPING,
-                FINISHING;
+                FINISHING
             }
 
-            public LongSourceReader(final long minCheckpoints, int expectedRestarts) {
+            public LongSourceReader(int subtaskIndex, int minCheckpoints, int expectedRestarts) {
+                this.subtaskIndex = subtaskIndex;
                 this.minCheckpoints = minCheckpoints;
                 this.expectedRestarts = expectedRestarts;
             }
@@ -221,13 +231,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
             @Override
             public InputStatus pollNext(ReaderOutput<Long> output) throws InterruptedException {
-                if (split == null) {
-                    return InputStatus.NOTHING_AVAILABLE;
+                for (LongSplit split : splits) {
+                    output.collect(withHeader(split.nextNumber), split.nextNumber);
+                    split.nextNumber += split.increment;
                 }
 
-                output.collect(withHeader(split.nextNumber), split.nextNumber);
-                split.nextNumber += split.increment;
-
                 switch (pollingState) {
                     case FINISHING:
                         return InputStatus.END_OF_INPUT;
@@ -245,35 +253,30 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
             @Override
             public List<LongSplit> snapshotState(long checkpointId) {
-                if (split == null) {
-                    return Collections.emptyList();
-                }
                 LOG.info(
                         "Snapshotted {} @ {} subtask ({} attempt)",
-                        split,
-                        split.nextNumber % split.increment,
+                        splits,
+                        subtaskIndex,
                         numRestarts);
-                return singletonList(split);
+                return splits;
             }
 
             @Override
             public void notifyCheckpointComplete(long checkpointId) {
-                if (split != null) {
-                    LOG.info(
-                            "notifyCheckpointComplete {} @ {} subtask ({} attempt)",
-                            split.numCompletedCheckpoints,
-                            split.nextNumber % split.increment,
-                            numRestarts);
-                    // Update polling state before final checkpoint such that if there is an issue
-                    // during finishing, after recovery the source immediately starts finishing
-                    // again. In this way, we avoid a deadlock where some tasks need another
-                    // checkpoint completed, while some tasks are finishing (and thus there are no
-                    // new checkpoint).
-                    updatePollingState();
-                    split.numCompletedCheckpoints++;
-                    numCheckpointsInThisAttempt++;
-                    numAbortedCheckpoints = 0;
-                }
+                LOG.info(
+                        "notifyCheckpointComplete {} @ {} subtask ({} attempt)",
+                        numCompletedCheckpoints,
+                        subtaskIndex,
+                        numRestarts);
+                // Update polling state before final checkpoint such that if there is an issue
+                // during finishing, after recovery the source immediately starts finishing
+                // again. In this way, we avoid a deadlock where some tasks need another
+                // checkpoint completed, while some tasks are finishing (and thus there are no
+                // new checkpoint).
+                updatePollingState();
+                numCompletedCheckpoints++;
+                numCheckpointsInThisAttempt++;
+                numAbortedCheckpoints = 0;
             }
 
             @Override
@@ -283,7 +286,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                     // the pipeline is already completed
                     // here simply also advance completed checkpoints to avoid running into a live
                     // lock
-                    split.numCompletedCheckpoints++;
+                    numCompletedCheckpoints++;
                     updatePollingState();
                 }
             }
@@ -295,29 +298,24 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
             @Override
             public void addSplits(List<LongSplit> splits) {
-                if (split != null) {
-                    throw new IllegalStateException(
-                            "Tried to add " + splits + " but already got " + split);
-                }
-                split = Iterables.getOnlyElement(splits);
+                this.splits.addAll(splits);
                 updatePollingState();
                 LOG.info(
-                        "Added split {}, pollingState={} @ {} subtask ({} attempt)",
-                        split,
+                        "Added splits {}, pollingState={} @ {} subtask ({} attempt)",
+                        splits,
                         pollingState,
-                        split.nextNumber % split.increment,
+                        subtaskIndex,
                         numRestarts);
             }
 
             @Override
-            public void notifyNoMoreSplits() {}
+            public void notifyNoMoreSplits() {
+                updatePollingState();
+            }
 
             private void updatePollingState() {
-                if (split == null) {
-                    return;
-                }
-                if (split.numCompletedCheckpoints >= minCheckpoints
-                        && numRestarts >= expectedRestarts) {
+                PollingState oldState = pollingState;
+                if (numCompletedCheckpoints >= minCheckpoints && numRestarts >= expectedRestarts) {
                     pollingState = PollingState.FINISHING;
                 } else if (numCheckpointsInThisAttempt == 0) {
                     // speed up recovery by throttling - use a successful checkpoint as a proxy
@@ -327,46 +325,62 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                     // cause backpressure
                     pollingState = PollingState.PUMPING;
                 }
+                if (oldState != pollingState) {
+                    LOG.debug(
+                            "Switched from {} to {} @ {} subtask ({} attempt)",
+                            oldState,
+                            pollingState,
+                            subtaskIndex,
+                            numRestarts);
+                }
             }
 
             @Override
             public void handleSourceEvents(SourceEvent sourceEvent) {
-                if (sourceEvent instanceof RestartEvent) {
-                    numRestarts = ((RestartEvent) sourceEvent).numRestarts;
+                if (sourceEvent instanceof SyncEvent) {
+                    numRestarts = ((SyncEvent) sourceEvent).numRestarts;
+                    numCompletedCheckpoints = ((SyncEvent) sourceEvent).numCheckpoints;
                     updatePollingState();
                     LOG.info(
-                            "Set restarts {}, pollingState={} ({} attempt)",
-                            split,
+                            "Set restarts={}, numCompletedCheckpoints={}, pollingState={} @ {} subtask ({} attempt)",
+                            numRestarts,
+                            numCompletedCheckpoints,
                             pollingState,
+                            subtaskIndex,
                             numRestarts);
                 }
             }
 
             @Override
             public void close() throws Exception {
-                if (split != null) {
+                for (LongSplit split : splits) {
                     numInputsCounter.add(split.nextNumber / split.increment);
                 }
             }
         }
 
         private static class RestartEvent implements SourceEvent {
+
+            private RestartEvent() {}
+        }
+
+        private static class SyncEvent implements SourceEvent {
             final int numRestarts;
+            final int numCheckpoints;
 
-            private RestartEvent(int numRestarts) {
+            private SyncEvent(int numRestarts, int numCheckpoints) {
                 this.numRestarts = numRestarts;
+                this.numCheckpoints = numCheckpoints;
             }
         }
 
         private static class LongSplit implements SourceSplit {
             private final int increment;
             private long nextNumber;
-            private int numCompletedCheckpoints;
 
-            public LongSplit(long nextNumber, int increment, int numCompletedCheckpoints) {
+            public LongSplit(long nextNumber, int increment) {
                 this.nextNumber = nextNumber;
                 this.increment = increment;
-                this.numCompletedCheckpoints = numCompletedCheckpoints;
             }
 
             public int getBaseNumber() {
@@ -380,14 +394,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
             @Override
             public String toString() {
-                return "LongSplit{"
-                        + "increment="
-                        + increment
-                        + ", nextNumber="
-                        + nextNumber
-                        + ", numCompletedCheckpoints="
-                        + numCompletedCheckpoints
-                        + '}';
+                return "LongSplit{" + "increment=" + increment + ", nextNumber=" + nextNumber + '}';
             }
         }
 
@@ -406,7 +413,16 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             public void start() {}
 
             @Override
-            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
+            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+                if (sourceEvent instanceof RestartEvent) {
+                    state.numRestarts++;
+                    final SyncEvent event =
+                            new SyncEvent(state.numRestarts, state.numCompletedCheckpoints);
+                    context.registeredReaders()
+                            .keySet()
+                            .forEach(index -> context.sendEventToSourceReader(index, event));
+                }
+            }
 
             @Override
             public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}
@@ -417,29 +433,28 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                     LOG.info("addSplitsBack {}", splits);
                     state.unassignedSplits.addAll(splits);
                 }
-                if (subtaskId == 0) {
-                    // currently always called on failure
-                    state.numRestarts++;
-                }
             }
 
             @Override
             public void addReader(int subtaskId) {
-                if (context.registeredReaders().size() == context.currentParallelism()
-                        && !state.unassignedSplits.isEmpty()) {
-                    Map<Integer, List<LongSplit>> assignment =
-                            state.unassignedSplits.stream()
-                                    .collect(Collectors.groupingBy(LongSplit::getBaseNumber));
-                    LOG.info("Assigning splits {}", assignment);
-                    context.assignSplits(new SplitsAssignment<>(assignment));
-                    state.unassignedSplits.clear();
+                if (context.registeredReaders().size() == context.currentParallelism()) {
+                    if (!state.unassignedSplits.isEmpty()) {
+                        Map<Integer, List<LongSplit>> assignment =
+                                state.unassignedSplits.stream()
+                                        .collect(Collectors.groupingBy(LongSplit::getBaseNumber));
+                        LOG.info("Assigning splits {}", assignment);
+                        context.assignSplits(new SplitsAssignment<>(assignment));
+                        state.unassignedSplits.clear();
+                    }
+                    context.registeredReaders().keySet().forEach(context::signalNoMoreSplits);
                 }
-                context.sendEventToSourceReader(subtaskId, new RestartEvent(state.numRestarts));
+                context.sendEventToSourceReader(
+                        subtaskId, new SyncEvent(state.numRestarts, state.numCompletedCheckpoints));
             }
 
             @Override
             public void notifyCheckpointComplete(long checkpointId) {
-                state.unassignedSplits.forEach(s -> s.numCompletedCheckpoints++);
+                state.numCompletedCheckpoints++;
             }
 
             @Override
@@ -455,10 +470,15 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         private static class EnumeratorState {
             private final List<LongSplit> unassignedSplits;
             private int numRestarts;
+            private int numCompletedCheckpoints;
 
-            public EnumeratorState(List<LongSplit> unassignedSplits, int numRestarts) {
+            public EnumeratorState(
+                    List<LongSplit> unassignedSplits,
+                    int numRestarts,
+                    int numCompletedCheckpoints) {
                 this.unassignedSplits = unassignedSplits;
                 this.numRestarts = numRestarts;
+                this.numCompletedCheckpoints = numCompletedCheckpoints;
             }
 
             @Override
@@ -468,13 +488,15 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                         + unassignedSplits
                         + ", numRestarts="
                         + numRestarts
+                        + ", numCompletedCheckpoints="
+                        + numCompletedCheckpoints
                         + '}';
             }
         }
 
         private static class EnumeratorVersionedSerializer
                 implements SimpleVersionedSerializer<EnumeratorState> {
-            private SplitVersionedSerializer splitVersionedSerializer =
+            private final SplitVersionedSerializer splitVersionedSerializer =
                     new SplitVersionedSerializer();
 
             @Override
@@ -487,8 +509,9 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                 final ByteBuffer byteBuffer =
                         ByteBuffer.allocate(
                                 state.unassignedSplits.size() * SplitVersionedSerializer.LENGTH
-                                        + 4);
+                                        + 8);
                 byteBuffer.putInt(state.numRestarts);
+                byteBuffer.putInt(state.numCompletedCheckpoints);
                 for (final LongSplit unassignedSplit : state.unassignedSplits) {
                     byteBuffer.put(splitVersionedSerializer.serialize(unassignedSplit));
                 }
@@ -499,6 +522,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             public EnumeratorState deserialize(int version, byte[] serialized) {
                 final ByteBuffer byteBuffer = ByteBuffer.wrap(serialized);
                 final int numRestarts = byteBuffer.getInt();
+                final int numCompletedCheckpoints = byteBuffer.getInt();
 
                 final List<LongSplit> splits =
                         new ArrayList<>(serialized.length / SplitVersionedSerializer.LENGTH);
@@ -508,7 +532,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                     byteBuffer.get(serializedSplit);
                     splits.add(splitVersionedSerializer.deserialize(version, serializedSplit));
                 }
-                return new EnumeratorState(splits, numRestarts);
+                return new EnumeratorState(splits, numRestarts, numCompletedCheckpoints);
             }
         }
 
@@ -524,18 +548,14 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             @Override
             public byte[] serialize(LongSplit split) {
                 final byte[] bytes = new byte[LENGTH];
-                ByteBuffer.wrap(bytes)
-                        .putLong(split.nextNumber)
-                        .putInt(split.increment)
-                        .putInt(split.numCompletedCheckpoints);
+                ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment);
                 return bytes;
             }
 
             @Override
             public LongSplit deserialize(int version, byte[] serialized) {
                 final ByteBuffer byteBuffer = ByteBuffer.wrap(serialized);
-                return new LongSplit(
-                        byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getInt());
+                return new LongSplit(byteBuffer.getLong(), byteBuffer.getInt());
             }
         }
     }
@@ -545,21 +565,23 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                 StreamExecutionEnvironment environment,
                 int minCheckpoints,
                 boolean slotSharing,
-                int expectedFailures);
+                int expectedFailuresUntilSourceFinishes);
     }
 
     /** Builder-like interface for all relevant unaligned settings. */
     protected static class UnalignedSettings {
         private int parallelism;
         private int slotsPerTaskManager = 1;
-        private int minCheckpoints = 10;
+        private final int minCheckpoints = 10;
         private boolean slotSharing = true;
         @Nullable private File restoreCheckpoint;
         private boolean generateCheckpoint = false;
         private int numSlots;
         private int numBuffers;
-        private int expectedFailures = 0;
+        int expectedFailures = 0;
         private final DagCreator dagCreator;
+        private int alignmentTimeout = 0;
+        private int failuresAfterSourceFinishes = 0;
 
         public UnalignedSettings(DagCreator dagCreator) {
             this.dagCreator = dagCreator;
@@ -605,6 +627,16 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             return this;
         }
 
+        public UnalignedSettings setAlignmentTimeout(int alignmentTimeout) {
+            this.alignmentTimeout = alignmentTimeout;
+            return this;
+        }
+
+        public UnalignedSettings setFailuresAfterSourceFinishes(int failuresAfterSourceFinishes) {
+            this.failuresAfterSourceFinishes = failuresAfterSourceFinishes;
+            return this;
+        }
+
         public StreamExecutionEnvironment createEnvironment(File checkpointDir) {
             Configuration conf = new Configuration();
 
@@ -675,6 +707,22 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         }
     }
 
+    /** Shifts the partitions one up. */
+    protected static class ShiftingPartitioner implements Partitioner<Long> {
+        @Override
+        public int partition(Long key, int numPartitions) {
+            return (int) ((withoutHeader(key) + 1) % numPartitions);
+        }
+    }
+
+    /** Distributes chunks of the size of numPartitions in a round robin fashion. */
+    protected static class ChunkDistributingPartitioner implements Partitioner<Long> {
+        @Override
+        public int partition(Long key, int numPartitions) {
+            return (int) ((withoutHeader(key) / numPartitions) % numPartitions);
+        }
+    }
+
     /** A mapper that fails in particular situations/attempts. */
     protected static class FailingMapper extends RichMapFunction<Long, Long>
             implements CheckpointedFunction, CheckpointListener {
@@ -812,6 +860,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         private ListState<State> stateList;
         protected transient State state;
         protected final long minCheckpoints;
+        protected boolean backpressure;
 
         protected VerifyingSinkBase(long minCheckpoints) {
             this.minCheckpoints = minCheckpoints;
@@ -836,9 +885,9 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
                                     new ListStateDescriptor<>(
                                             "state", (Class<State>) state.getClass()));
             this.state = getOnlyElement(stateList.get(), state);
+            backpressure = false;
             LOG.info(
-                    "Init state {} @ {} subtask ({} attempt)",
-                    this.state,
+                    "Inducing backpressure=false @ {} subtask ({} attempt)",
                     getRuntimeContext().getIndexOfThisSubtask(),
                     getRuntimeContext().getAttemptNumber());
         }
@@ -847,11 +896,6 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            LOG.info(
-                    "Snapshot state {} @ {} subtask ({} attempt)",
-                    state,
-                    getRuntimeContext().getIndexOfThisSubtask(),
-                    getRuntimeContext().getAttemptNumber());
             stateList.clear();
             stateList.add(state);
         }
@@ -859,6 +903,12 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         @Override
         public void notifyCheckpointComplete(long checkpointId) {
             state.completedCheckpoints++;
+            backpressure = state.completedCheckpoints < minCheckpoints;
+            LOG.info(
+                    "Inducing backpressure={} @ {} subtask ({} attempt)",
+                    backpressure,
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    getRuntimeContext().getAttemptNumber());
         }
 
         @Override
@@ -879,7 +929,53 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
         }
     }
 
+    static class MinEmittingFunction extends RichCoFlatMapFunction<Long, Long, Long>
+            implements CheckpointedFunction {
+        private ListState<State> stateList;
+        private State state;
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            stateList.clear();
+            stateList.add(state);
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            stateList =
+                    context.getOperatorStateStore()
+                            .getListState(new ListStateDescriptor<>("state", State.class));
+            state = getOnlyElement(stateList.get(), new State());
+        }
+
+        @Override
+        public void flatMap1(Long value, Collector<Long> out) {
+            long baseValue = withoutHeader(value);
+            state.lastLeft = baseValue;
+            if (state.lastRight >= baseValue) {
+                out.collect(value);
+            }
+        }
+
+        @Override
+        public void flatMap2(Long value, Collector<Long> out) {
+            long baseValue = withoutHeader(value);
+            state.lastRight = baseValue;
+            if (state.lastLeft >= baseValue) {
+                out.collect(value);
+            }
+        }
+
+        private static class State {
+            private long lastLeft = Long.MIN_VALUE;
+            private long lastRight = Long.MIN_VALUE;
+        }
+    }
+
     protected static long withHeader(long value) {
+        checkState(
+                value <= Integer.MAX_VALUE,
+                "Value too large for header, this indicates that the test is running too long.");
         return value ^ HEADER;
     }
 
@@ -890,7 +986,11 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
     protected static long checkHeader(long value) {
         if ((value & HEADER_MASK) != HEADER) {
-            throw new IllegalArgumentException("Stream corrupted");
+            throw new IllegalArgumentException(
+                    "Stream corrupted. Cannot find the header "
+                            + Long.toHexString(HEADER)
+                            + " in the value "
+                            + Long.toHexString(value));
         }
         return value;
     }