You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/02/25 18:27:11 UTC

[flink] 02/02: [FLINK-21452][connector/common] Stop snapshotting registered readers in source coordinator.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb99ce2e22ca84dece1f7a431a92a4cecb6a71f2
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Feb 23 09:40:31 2021 +0100

    [FLINK-21452][connector/common] Stop snapshotting registered readers in source coordinator.
    
    Sources used to store their registered readers into the snapshot. However, when downscaling, they have unmatched readers that violate a couple of invariants.
    The solution is to not store registered readers - they are re-registered on restart anyways.
    To keep it backward compatible, the best option is to always store an empty list of readers while writing the snapshot and discard any recovered readers from the snapshot.
---
 .../reader/CoordinatedSourceRescaleITCase.java     | 162 +++++++++++++++++++++
 .../coordinator/SourceCoordinatorContext.java      |  16 +-
 .../coordinator/SourceCoordinatorSerdeUtils.java   |  27 ----
 .../coordinator/SourceCoordinatorContextTest.java  |   3 +-
 .../coordinator/SourceCoordinatorProviderTest.java |   9 +-
 5 files changed, 177 insertions(+), 40 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
new file mode 100644
index 0000000..8ada12b
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests if the coordinator handles up and downscaling. */
+public class CoordinatedSourceRescaleITCase {
+    public static final String CREATED_CHECKPOINT = "successfully created checkpoint";
+    public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint";
+    @Rule public final TemporaryFolder temp = new TemporaryFolder();
+
+    @Test
+    public void testDownscaling() throws Exception {
+        final File checkpointDir = temp.newFolder();
+        final File lastCheckpoint = generateCheckpoint(checkpointDir, 7);
+        resumeCheckpoint(checkpointDir, lastCheckpoint, 3);
+    }
+
+    @Test
+    public void testUpscaling() throws Exception {
+        final File checkpointDir = temp.newFolder();
+        final File lastCheckpoint = generateCheckpoint(checkpointDir, 3);
+        resumeCheckpoint(checkpointDir, lastCheckpoint, 7);
+    }
+
+    private File generateCheckpoint(File checkpointDir, int p) throws IOException {
+        final StreamExecutionEnvironment env = createEnv(checkpointDir, null, p);
+
+        try {
+            env.execute("create checkpoint");
+            throw new AssertionError("No checkpoint");
+        } catch (Exception e) {
+            assertEquals(CREATED_CHECKPOINT, ExceptionUtils.getRootCause(e).getMessage());
+            return Files.find(
+                            checkpointDir.toPath(),
+                            2,
+                            (file, attr) ->
+                                    attr.isDirectory()
+                                            && file.getFileName().toString().startsWith("chk"))
+                    .min(Comparator.comparing(Path::toString))
+                    .map(Path::toFile)
+                    .orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint", e));
+        }
+    }
+
+    private void resumeCheckpoint(File checkpointDir, File restoreCheckpoint, int p) {
+        final StreamExecutionEnvironment env = createEnv(checkpointDir, restoreCheckpoint, p);
+
+        try {
+            env.execute("resume checkpoint");
+            throw new AssertionError("No success error");
+        } catch (Exception e) {
+            if (RESTORED_CHECKPOINT != ExceptionUtils.getRootCause(e).getMessage()) {
+                throw new AssertionError("Cannot resume", e);
+            }
+        }
+    }
+
+    private StreamExecutionEnvironment createEnv(
+            File checkpointDir, @Nullable File restoreCheckpoint, int p) {
+        Configuration conf = new Configuration();
+        conf.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+        conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
+        if (restoreCheckpoint != null) {
+            conf.set(SavepointConfigOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
+        }
+        conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p);
+
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.createLocalEnvironment(p, conf);
+        env.enableCheckpointing(100);
+        env.getCheckpointConfig()
+                .enableExternalizedCheckpoints(
+                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        DataStream<Long> stream = env.fromSequence(0, Long.MAX_VALUE);
+        stream.map(new FailingMapFunction(restoreCheckpoint == null)).addSink(new SleepySink());
+
+        return env;
+    }
+
+    private static class FailingMapFunction extends RichMapFunction<Long, Long>
+            implements CheckpointListener {
+        private static final long serialVersionUID = 699621912578369378L;
+        private boolean generateCheckpoint;
+
+        FailingMapFunction(boolean generateCheckpoint) {
+            this.generateCheckpoint = generateCheckpoint;
+        }
+
+        @Override
+        public Long map(Long value) throws Exception {
+            // run a bit before failing
+            if (!generateCheckpoint && value % 100 == 42) {
+                throw new Exception(RESTORED_CHECKPOINT);
+            }
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (generateCheckpoint && checkpointId > 5) {
+                throw new Exception(CREATED_CHECKPOINT);
+            }
+        }
+    }
+
+    private static class SleepySink implements SinkFunction<Long> {
+        private static final long serialVersionUID = -3542950841846119765L;
+
+        @Override
+        public void invoke(Long value, Context context) throws Exception {
+            if (value % 1000 == 0) {
+                Thread.sleep(1);
+            }
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 493ce69..a262807 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -58,7 +58,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
-import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeRegisteredReaders;
 
 /**
  * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext}
@@ -288,7 +287,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
             SimpleVersionedSerializer<SplitT> splitSerializer,
             DataOutputStream out)
             throws Exception {
-        writeRegisteredReaders(registeredReaders, out);
+        // FLINK-21452: backwards compatible change to drop writing registered readers (empty list)
+        out.writeInt(0);
         assignmentTracker.snapshotState(checkpointId, splitSerializer, out);
     }
 
@@ -301,9 +301,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
      */
     void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in)
             throws Exception {
-        Map<Integer, ReaderInfo> readers = readRegisteredReaders(in);
-        registeredReaders.clear();
-        registeredReaders.putAll(readers);
+        // FLINK-21452: discard readers as they will be re-registering themselves
+        readRegisteredReaders(in);
         assignmentTracker.restoreState(splitSerializer, in);
     }
 
@@ -313,7 +312,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
      * @param readerInfo the reader information of the source reader.
      */
     void registerSourceReader(ReaderInfo readerInfo) {
-        registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+        final ReaderInfo previousReader =
+                registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+        if (previousReader != null) {
+            throw new IllegalStateException(
+                    "Overwriting " + previousReader + " with " + readerInfo);
+        }
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
index 1295d4a..51c4ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
@@ -49,20 +49,6 @@ public class SourceCoordinatorSerdeUtils {
         }
     }
 
-    /**
-     * Get serialized size of the registered readers map.
-     *
-     * <p>The binary format is following: 4 Bytes - num entries. N Bytes - entries 4 Bytes - subtask
-     * id N Bytes - reader info, see {@link #writeReaderInfo(ReaderInfo, DataOutputStream)}.
-     */
-    static void writeRegisteredReaders(
-            Map<Integer, ReaderInfo> registeredReaders, DataOutputStream out) throws IOException {
-        out.writeInt(registeredReaders.size());
-        for (ReaderInfo info : registeredReaders.values()) {
-            writeReaderInfo(info, out);
-        }
-    }
-
     static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream in) throws IOException {
         int numReaders = in.readInt();
         Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
@@ -144,19 +130,6 @@ public class SourceCoordinatorSerdeUtils {
 
     // ----- private helper methods -----
 
-    /**
-     * Serialize {@link ReaderInfo}.
-     *
-     * <p>The binary format is following: 4 Bytes - subtask id N Bytes - location string
-     *
-     * @param readerInfo the given reader information to serialize.
-     */
-    private static void writeReaderInfo(ReaderInfo readerInfo, DataOutputStream out)
-            throws IOException {
-        out.writeInt(readerInfo.getSubtaskId());
-        out.writeUTF(readerInfo.getLocation());
-    }
-
     private static ReaderInfo readReaderInfo(DataInputStream in) throws IOException {
         int subtaskId = in.readInt();
         String location = in.readUTF();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 3fdcdae..389a6b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -172,7 +172,8 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
                             restoredTracker);
             restoredContext.restoreState(new MockSourceSplitSerializer(), in);
         }
-        assertEquals(context.registeredReaders(), restoredContext.registeredReaders());
+        // FLINK-21452: do not (re)store registered readers
+        assertEquals(0, restoredContext.registeredReaders().size());
         assertEquals(
                 splitSplitAssignmentTracker.uncheckpointedAssignments(),
                 restoredTracker.uncheckpointedAssignments());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 966fc0d..33b085c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /** Unit tests for {@link SourceCoordinatorProvider}. */
@@ -96,13 +95,11 @@ public class SourceCoordinatorProviderTest {
                 "The restored source coordinator should be a different instance",
                 restoredSourceCoordinator,
                 sourceCoordinator);
+        // FLINK-21452: do not (re)store registered readers
         assertEquals(
-                "There should only be one registered reader.",
-                1,
+                "There should be no registered reader.",
+                0,
                 restoredSourceCoordinator.getContext().registeredReaders().size());
-        assertNotNull(
-                "The only registered reader should be reader 0",
-                restoredSourceCoordinator.getContext().registeredReaders().get(0));
     }
 
     @Test