You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 08:08:37 UTC
[flink-table-store] branch master updated: [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d92a7f37 [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore
d92a7f37 is described below
commit d92a7f37dc072a6d3178b28c2d6040667b9f96b1
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jul 19 16:08:32 2022 +0800
[FLINK-28605] Throw exception intentionally when new snapshots are committed during restore
This closes #225
---
.../store/connector/sink/CommitterOperator.java | 16 +-
.../connector/sink/CommitterOperatorTest.java | 172 +++++++++++++++++++++
2 files changed, 185 insertions(+), 3 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index 2ae924aa..fb7083ac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -136,14 +136,24 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
commit(true, restored);
}
- public void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception {
+ private void commit(boolean isRecover, List<ManifestCommittable> committables)
+ throws Exception {
if (isRecover) {
committables = committer.filterRecoveredCommittables(committables);
+ if (!committables.isEmpty()) {
+ committer.commit(committables);
+ throw new RuntimeException(
+ "This exception is intentionally thrown "
+ + "after committing the restored checkpoints. "
+ + "By restarting the job we hope that "
+ + "writers can start writing based on these new commits.");
+ }
+ } else {
+ committer.commit(committables);
}
- committer.commit(committables);
}
- public ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
+ private ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
throws Exception {
return committer.combine(checkpoint, inputs);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
new file mode 100644
index 00000000..c9ea2710
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Tests for {@link CommitterOperator}. */
+public class CommitterOperatorTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"a", "b"});
+
+ @TempDir public java.nio.file.Path tempDir;
+ private Path tablePath;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(tempDir.toString());
+ }
+
+ @Test
+ public void testFailIntentionallyAfterRestore() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
+ createTestHarness(table);
+ testHarness.open();
+
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 10L));
+ write.write(GenericRowData.of(2, 20L));
+
+ long timestamp = 1;
+ for (FileCommittable committable : write.prepareCommit(false)) {
+ testHarness.processElement(
+ new Committable(Committable.Kind.FILE, committable), timestamp++);
+ }
+ // checkpoint is completed but not notified, so no snapshot is committed
+ OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
+ assertThat(table.snapshotManager().latestSnapshotId()).isNull();
+
+ testHarness = createTestHarness(table);
+ try {
+ // commit snapshot from state, fail intentionally
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+ fail("Expecting intentional exception");
+ } catch (Exception e) {
+ assertThat(e)
+ .hasMessageContaining(
+ "This exception is intentionally thrown "
+ + "after committing the restored checkpoints. "
+ + "By restarting the job we hope that "
+ + "writers can start writing based on these new commits.");
+ }
+ assertResults(table, "1, 10", "2, 20");
+
+ // snapshot is successfully committed, no failure is needed
+ testHarness = createTestHarness(table);
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+ assertResults(table, "1, 10", "2, 20");
+ }
+
+ private void assertResults(FileStoreTable table, String... expected) {
+ TableRead read = table.newRead();
+ List<String> actual = new ArrayList<>();
+ table.newScan()
+ .plan()
+ .splits
+ .forEach(
+ s -> {
+ try {
+ RecordReader<RowData> recordReader = read.createReader(s);
+ CloseableIterator<RowData> it =
+ new RecordReaderIterator<>(recordReader);
+ while (it.hasNext()) {
+ RowData row = it.next();
+ actual.add(row.getInt(0) + ", " + row.getLong(1));
+ }
+ it.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Collections.sort(actual);
+ assertThat(actual).isEqualTo(Arrays.asList(expected));
+ }
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(conf);
+ }
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(
+ FileStoreTable table) throws Exception {
+ CommitterOperator operator =
+ new CommitterOperator(
+ true,
+ user -> new StoreCommitter(table.newCommit(user)),
+ ManifestCommittableSerializer::new);
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new ExecutionConfig());
+ OneInputStreamOperatorTestHarness<Committable, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator, serializer);
+ harness.setup(serializer);
+ return harness;
+ }
+}