You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 08:08:37 UTC

[flink-table-store] branch master updated: [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d92a7f37 [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore
d92a7f37 is described below

commit d92a7f37dc072a6d3178b28c2d6040667b9f96b1
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jul 19 16:08:32 2022 +0800

    [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore
    
    This closes #225
---
 .../store/connector/sink/CommitterOperator.java    |  16 +-
 .../connector/sink/CommitterOperatorTest.java      | 172 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 3 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index 2ae924aa..fb7083ac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -136,14 +136,24 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
         commit(true, restored);
     }
 
-    public void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception {
+    private void commit(boolean isRecover, List<ManifestCommittable> committables)
+            throws Exception {
         if (isRecover) {
             committables = committer.filterRecoveredCommittables(committables);
+            if (!committables.isEmpty()) {
+                committer.commit(committables);
+                throw new RuntimeException(
+                        "This exception is intentionally thrown "
+                                + "after committing the restored checkpoints. "
+                                + "By restarting the job we hope that "
+                                + "writers can start writing based on these new commits.");
+            }
+        } else {
+            committer.commit(committables);
         }
-        committer.commit(committables);
     }
 
-    public ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
+    private ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
             throws Exception {
         return committer.combine(checkpoint, inputs);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
new file mode 100644
index 00000000..c9ea2710
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Tests for {@link CommitterOperator}. */
+public class CommitterOperatorTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType()
+                    },
+                    new String[] {"a", "b"});
+
+    @TempDir public java.nio.file.Path tempDir;
+    private Path tablePath;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(tempDir.toString());
+    }
+
+    @Test
+    public void testFailIntentionallyAfterRestore() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
+                createTestHarness(table);
+        testHarness.open();
+
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, 10L));
+        write.write(GenericRowData.of(2, 20L));
+
+        long timestamp = 1;
+        for (FileCommittable committable : write.prepareCommit(false)) {
+            testHarness.processElement(
+                    new Committable(Committable.Kind.FILE, committable), timestamp++);
+        }
+        // checkpoint is completed but not notified, so no snapshot is committed
+        OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
+        assertThat(table.snapshotManager().latestSnapshotId()).isNull();
+
+        testHarness = createTestHarness(table);
+        try {
+            // commit snapshot from state, fail intentionally
+            testHarness.initializeState(snapshot);
+            testHarness.open();
+            fail("Expecting intentional exception");
+        } catch (Exception e) {
+            assertThat(e)
+                    .hasMessageContaining(
+                            "This exception is intentionally thrown "
+                                    + "after committing the restored checkpoints. "
+                                    + "By restarting the job we hope that "
+                                    + "writers can start writing based on these new commits.");
+        }
+        assertResults(table, "1, 10", "2, 20");
+
+        // snapshot is successfully committed, no failure is needed
+        testHarness = createTestHarness(table);
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+        assertResults(table, "1, 10", "2, 20");
+    }
+
+    private void assertResults(FileStoreTable table, String... expected) {
+        TableRead read = table.newRead();
+        List<String> actual = new ArrayList<>();
+        table.newScan()
+                .plan()
+                .splits
+                .forEach(
+                        s -> {
+                            try {
+                                RecordReader<RowData> recordReader = read.createReader(s);
+                                CloseableIterator<RowData> it =
+                                        new RecordReaderIterator<>(recordReader);
+                                while (it.hasNext()) {
+                                    RowData row = it.next();
+                                    actual.add(row.getInt(0) + ", " + row.getLong(1));
+                                }
+                                it.close();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        Collections.sort(actual);
+        assertThat(actual).isEqualTo(Arrays.asList(expected));
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        ROW_TYPE,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        conf.toMap(),
+                        ""));
+        return FileStoreTableFactory.create(conf);
+    }
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(
+            FileStoreTable table) throws Exception {
+        CommitterOperator operator =
+                new CommitterOperator(
+                        true,
+                        user -> new StoreCommitter(table.newCommit(user)),
+                        ManifestCommittableSerializer::new);
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new ExecutionConfig());
+        OneInputStreamOperatorTestHarness<Committable, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator, serializer);
+        harness.setup(serializer);
+        return harness;
+    }
+}