You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 07:38:13 UTC

[flink-table-store] branch master updated: [FLINK-29369] Commit delete file failure due to Checkpoint aborted

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 2219fbad [FLINK-29369] Commit delete file failure due to Checkpoint aborted
2219fbad is described below

commit 2219fbad07e413a2961a1c806b0f9647ccf84bc8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Sep 26 15:38:08 2022 +0800

    [FLINK-29369] Commit delete file failure due to Checkpoint aborted
    
    This closes #299
---
 .../table/store/connector/sink/Committable.java    | 21 +++++++++-
 .../connector/sink/CommittableSerializer.java      | 16 +++++---
 .../store/connector/sink/CommitterOperator.java    | 45 ++++++++++++++--------
 .../connector/sink/PrepareCommitOperator.java      | 11 +++---
 .../store/connector/sink/StoreCompactOperator.java |  5 ++-
 .../store/connector/sink/StoreWriteOperator.java   |  6 ++-
 .../connector/sink/CommittableSerializerTest.java  | 10 +++--
 .../connector/sink/CommitterOperatorTest.java      | 34 +++++++++++++++-
 .../table/store/file/mergetree/Increment.java      | 12 ++++++
 .../table/store/table/sink/FileCommittable.java    | 12 ++++++
 .../flink/table/store/spark/SparkReadITCase.java   |  7 +++-
 11 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index 158287a7..57cbda8d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -21,15 +21,22 @@ package org.apache.flink.table.store.connector.sink;
 /** Committable produced by {@link PrepareCommitOperator}. */
 public class Committable {
 
+    private final long checkpointId;
+
     private final Kind kind;
 
     private final Object wrappedCommittable;
 
-    public Committable(Kind kind, Object wrappedCommittable) {
+    public Committable(long checkpointId, Kind kind, Object wrappedCommittable) {
+        this.checkpointId = checkpointId;
         this.kind = kind;
         this.wrappedCommittable = wrappedCommittable;
     }
 
+    public long checkpointId() {
+        return checkpointId;
+    }
+
     public Kind kind() {
         return kind;
     }
@@ -38,6 +45,18 @@ public class Committable {
         return wrappedCommittable;
     }
 
+    @Override
+    public String toString() {
+        return "Committable{"
+                + "checkpointId="
+                + checkpointId
+                + ", kind="
+                + kind
+                + ", wrappedCommittable="
+                + wrappedCommittable
+                + '}';
+    }
+
     enum Kind {
         FILE((byte) 0),
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index b5e2ed77..e4554232 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -35,7 +35,7 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
 
     @Override
     public int getVersion() {
-        return 1;
+        return 2;
     }
 
     @Override
@@ -57,7 +57,8 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
                 throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
         }
 
-        return ByteBuffer.allocate(1 + wrapped.length + 4)
+        return ByteBuffer.allocate(8 + 1 + wrapped.length + 4)
+                .putLong(committable.checkpointId())
                 .put(committable.kind().toByteValue())
                 .put(wrapped)
                 .putInt(version)
@@ -65,10 +66,15 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
     }
 
     @Override
-    public Committable deserialize(int i, byte[] bytes) throws IOException {
+    public Committable deserialize(int committableVersion, byte[] bytes) throws IOException {
+        if (committableVersion != getVersion()) {
+            throw new RuntimeException("Can not deserialize version: " + committableVersion);
+        }
+
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        long checkpointId = buffer.getLong();
         Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
-        byte[] wrapped = new byte[bytes.length - 5];
+        byte[] wrapped = new byte[bytes.length - 13];
         buffer.get(wrapped);
         int version = buffer.getInt();
 
@@ -83,6 +89,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
             default:
                 throw new UnsupportedOperationException("Unsupported kind: " + kind);
         }
-        return new Committable(kind, wrappedCommittable);
+        return new Committable(checkpointId, kind, wrappedCommittable);
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fb7083ac..b5df8793 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -37,7 +37,9 @@ import org.apache.flink.util.function.SerializableSupplier;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -79,6 +81,8 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
      */
     private Committer committer;
 
+    private boolean endInput = false;
+
     public CommitterOperator(
             boolean streamingCheckpointEnabled,
             SerializableFunction<String, Committer> committerFactory,
@@ -161,11 +165,7 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        List<Committable> poll = pollInputs();
-        if (poll.size() > 0) {
-            committablesPerCheckpoint.put(
-                    context.getCheckpointId(), toCommittables(context.getCheckpointId(), poll));
-        }
+        pollInputs();
         streamingCommitterState.update(committables(committablesPerCheckpoint));
     }
 
@@ -175,22 +175,19 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
 
     @Override
     public void endInput() throws Exception {
+        endInput = true;
         if (streamingCheckpointEnabled) {
             return;
         }
 
-        long checkpointId = Long.MAX_VALUE;
-        List<Committable> poll = pollInputs();
-        if (!poll.isEmpty()) {
-            committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
-        }
-        commitUpToCheckpoint(checkpointId);
+        pollInputs();
+        commitUpToCheckpoint(Long.MAX_VALUE);
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        commitUpToCheckpoint(checkpointId);
+        commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
     }
 
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
@@ -213,9 +210,27 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
         super.close();
     }
 
-    private List<Committable> pollInputs() {
-        List<Committable> poll = new ArrayList<>(this.inputs);
+    private void pollInputs() throws Exception {
+        Map<Long, List<Committable>> grouped = new HashMap<>();
+        for (Committable c : inputs) {
+            grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList<>()).add(c);
+        }
+
+        for (Map.Entry<Long, List<Committable>> entry : grouped.entrySet()) {
+            Long cp = entry.getKey();
+            List<Committable> committables = entry.getValue();
+            if (committablesPerCheckpoint.containsKey(cp)) {
+                throw new RuntimeException(
+                        String.format(
+                                "Repeatedly commit the same checkpoint files. \n"
+                                        + "The previous files is %s, \n"
+                                        + "and the subsequent files is %s",
+                                committablesPerCheckpoint.get(cp), committables));
+            }
+
+            committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
+        }
+
         this.inputs.clear();
-        return poll;
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
index 052bbdb8..f873c135 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -44,7 +44,7 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         if (!endOfInput) {
-            emitCommittables(false);
+            emitCommittables(false, checkpointId);
         }
         // no records are expected to emit after endOfInput
     }
@@ -52,13 +52,14 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
     @Override
     public void endInput() throws Exception {
         endOfInput = true;
-        emitCommittables(true);
+        emitCommittables(true, Long.MAX_VALUE);
     }
 
-    private void emitCommittables(boolean endOfInput) throws IOException {
-        prepareCommit(endOfInput)
+    private void emitCommittables(boolean endOfInput, long checkpointId) throws IOException {
+        prepareCommit(endOfInput, checkpointId)
                 .forEach(committable -> output.collect(new StreamRecord<>(committable)));
     }
 
-    protected abstract List<Committable> prepareCommit(boolean endOfInput) throws IOException;
+    protected abstract List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+            throws IOException;
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index f4329d6b..8b4ded82 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -58,9 +58,10 @@ public class StoreCompactOperator extends PrepareCommitOperator {
     }
 
     @Override
-    protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
+    protected List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+            throws IOException {
         return compact.compact().stream()
-                .map(c -> new Committable(Committable.Kind.FILE, c))
+                .map(c -> new Committable(checkpointId, Committable.Kind.FILE, c))
                 .collect(Collectors.toList());
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index dd332979..89697b07 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -181,11 +181,12 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     }
 
     @Override
-    protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
+    protected List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+            throws IOException {
         List<Committable> committables = new ArrayList<>();
         try {
             for (FileCommittable committable : write.prepareCommit(endOfInput)) {
-                committables.add(new Committable(Committable.Kind.FILE, committable));
+                committables.add(new Committable(checkpointId, Committable.Kind.FILE, committable));
             }
         } catch (Exception e) {
             throw new IOException(e);
@@ -203,6 +204,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
                             (k, v) ->
                                     committables.add(
                                             new Committable(
+                                                    checkpointId,
                                                     Committable.Kind.LOG_OFFSET,
                                                     new LogOffsetCommittable(k, v))));
         }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 7752421c..a6ba8953 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -44,10 +44,10 @@ public class CommittableSerializerTest {
                 (FileCommittable)
                         serializer
                                 .deserialize(
-                                        1,
+                                        2,
                                         serializer.serialize(
                                                 new Committable(
-                                                        Committable.Kind.FILE, committable)))
+                                                        9, Committable.Kind.FILE, committable)))
                                 .wrappedCommittable();
         assertThat(newCommittable).isEqualTo(committable);
     }
@@ -59,10 +59,12 @@ public class CommittableSerializerTest {
                 (LogOffsetCommittable)
                         serializer
                                 .deserialize(
-                                        1,
+                                        2,
                                         serializer.serialize(
                                                 new Committable(
-                                                        Committable.Kind.LOG_OFFSET, committable)))
+                                                        8,
+                                                        Committable.Kind.LOG_OFFSET,
+                                                        committable)))
                                 .wrappedCommittable();
         assertThat(newCommittable).isEqualTo(committable);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
index c9ea2710..e8a2627b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.sink.FileCommittable;
@@ -87,7 +88,7 @@ public class CommitterOperatorTest {
         long timestamp = 1;
         for (FileCommittable committable : write.prepareCommit(false)) {
             testHarness.processElement(
-                    new Committable(Committable.Kind.FILE, committable), timestamp++);
+                    new Committable(8, Committable.Kind.FILE, committable), timestamp++);
         }
         // checkpoint is completed but not notified, so no snapshot is committed
         OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
@@ -116,6 +117,37 @@ public class CommitterOperatorTest {
         assertResults(table, "1, 10", "2, 20");
     }
 
+    @Test
+    public void testCheckpointAbort() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
+                createTestHarness(table);
+        testHarness.open();
+
+        // files from multiple checkpoint
+        // but no snapshot
+        long cpId = 0;
+        for (int i = 0; i < 10; i++) {
+            cpId++;
+            TableWrite write = table.newWrite();
+            write.write(GenericRowData.of(1, 10L));
+            write.write(GenericRowData.of(2, 20L));
+            for (FileCommittable committable : write.prepareCommit(false)) {
+                testHarness.processElement(
+                        new Committable(cpId, Committable.Kind.FILE, committable), 1);
+            }
+        }
+
+        // checkpoint is completed but not notified, so no snapshot is committed
+        testHarness.snapshot(cpId, 1);
+        testHarness.notifyOfCompletedCheckpoint(cpId);
+
+        SnapshotManager snapshotManager = new SnapshotManager(tablePath);
+
+        // should create 10 snapshots
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
+    }
+
     private void assertResults(FileStoreTable table, String... expected) {
         TableRead read = table.newRead();
         List<String> actual = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 2f65e7ef..ea784e97 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -97,4 +97,16 @@ public class Increment {
     public int hashCode() {
         return Objects.hash(newFiles, compactBefore, compactAfter);
     }
+
+    @Override
+    public String toString() {
+        return "Increment{"
+                + "newFiles="
+                + newFiles
+                + ", compactBefore="
+                + compactBefore
+                + ", compactAfter="
+                + compactAfter
+                + '}';
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
index e2a78286..a1da770c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
@@ -68,4 +68,16 @@ public class FileCommittable {
     public int hashCode() {
         return Objects.hash(partition, bucket, increment);
     }
+
+    @Override
+    public String toString() {
+        return "FileCommittable{"
+                + "partition="
+                + partition
+                + ", bucket="
+                + bucket
+                + ", increment="
+                + increment
+                + '}';
+    }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 356e97f6..522723b4 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -650,8 +650,11 @@ public class SparkReadITCase {
                 .isInstanceOf(NamespaceAlreadyExistsException.class)
                 .hasMessageContaining("Namespace 'bar' already exists");
 
-        assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
-                .isEqualTo("[[bar], [default]]");
+        assertThat(
+                        spark.sql("SHOW NAMESPACES").collectAsList().stream()
+                                .map(row -> row.getString(0))
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("bar", "default");
 
         Path nsPath = new Path(warehousePath, "bar.db");
         assertThat(new File(nsPath.toUri())).exists();