You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 07:38:13 UTC
[flink-table-store] branch master updated: [FLINK-29369] Commit delete file failure due to Checkpoint aborted
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 2219fbad [FLINK-29369] Commit delete file failure due to Checkpoint aborted
2219fbad is described below
commit 2219fbad07e413a2961a1c806b0f9647ccf84bc8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Sep 26 15:38:08 2022 +0800
[FLINK-29369] Commit delete file failure due to Checkpoint aborted
This closes #299
---
.../table/store/connector/sink/Committable.java | 21 +++++++++-
.../connector/sink/CommittableSerializer.java | 16 +++++---
.../store/connector/sink/CommitterOperator.java | 45 ++++++++++++++--------
.../connector/sink/PrepareCommitOperator.java | 11 +++---
.../store/connector/sink/StoreCompactOperator.java | 5 ++-
.../store/connector/sink/StoreWriteOperator.java | 6 ++-
.../connector/sink/CommittableSerializerTest.java | 10 +++--
.../connector/sink/CommitterOperatorTest.java | 34 +++++++++++++++-
.../table/store/file/mergetree/Increment.java | 12 ++++++
.../table/store/table/sink/FileCommittable.java | 12 ++++++
.../flink/table/store/spark/SparkReadITCase.java | 7 +++-
11 files changed, 142 insertions(+), 37 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index 158287a7..57cbda8d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -21,15 +21,22 @@ package org.apache.flink.table.store.connector.sink;
/** Committable produced by {@link PrepareCommitOperator}. */
public class Committable {
+ private final long checkpointId;
+
private final Kind kind;
private final Object wrappedCommittable;
- public Committable(Kind kind, Object wrappedCommittable) {
+ public Committable(long checkpointId, Kind kind, Object wrappedCommittable) {
+ this.checkpointId = checkpointId;
this.kind = kind;
this.wrappedCommittable = wrappedCommittable;
}
+ public long checkpointId() {
+ return checkpointId;
+ }
+
public Kind kind() {
return kind;
}
@@ -38,6 +45,18 @@ public class Committable {
return wrappedCommittable;
}
+ @Override
+ public String toString() {
+ return "Committable{"
+ + "checkpointId="
+ + checkpointId
+ + ", kind="
+ + kind
+ + ", wrappedCommittable="
+ + wrappedCommittable
+ + '}';
+ }
+
enum Kind {
FILE((byte) 0),
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index b5e2ed77..e4554232 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -35,7 +35,7 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
@Override
public int getVersion() {
- return 1;
+ return 2;
}
@Override
@@ -57,7 +57,8 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
}
- return ByteBuffer.allocate(1 + wrapped.length + 4)
+ return ByteBuffer.allocate(8 + 1 + wrapped.length + 4)
+ .putLong(committable.checkpointId())
.put(committable.kind().toByteValue())
.put(wrapped)
.putInt(version)
@@ -65,10 +66,15 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
}
@Override
- public Committable deserialize(int i, byte[] bytes) throws IOException {
+ public Committable deserialize(int committableVersion, byte[] bytes) throws IOException {
+ if (committableVersion != getVersion()) {
+ throw new RuntimeException("Can not deserialize version: " + committableVersion);
+ }
+
ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long checkpointId = buffer.getLong();
Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
- byte[] wrapped = new byte[bytes.length - 5];
+ byte[] wrapped = new byte[bytes.length - 13];
buffer.get(wrapped);
int version = buffer.getInt();
@@ -83,6 +89,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
default:
throw new UnsupportedOperationException("Unsupported kind: " + kind);
}
- return new Committable(kind, wrappedCommittable);
+ return new Committable(checkpointId, kind, wrappedCommittable);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fb7083ac..b5df8793 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -37,7 +37,9 @@ import org.apache.flink.util.function.SerializableSupplier;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
@@ -79,6 +81,8 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
*/
private Committer committer;
+ private boolean endInput = false;
+
public CommitterOperator(
boolean streamingCheckpointEnabled,
SerializableFunction<String, Committer> committerFactory,
@@ -161,11 +165,7 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
- List<Committable> poll = pollInputs();
- if (poll.size() > 0) {
- committablesPerCheckpoint.put(
- context.getCheckpointId(), toCommittables(context.getCheckpointId(), poll));
- }
+ pollInputs();
streamingCommitterState.update(committables(committablesPerCheckpoint));
}
@@ -175,22 +175,19 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
@Override
public void endInput() throws Exception {
+ endInput = true;
if (streamingCheckpointEnabled) {
return;
}
- long checkpointId = Long.MAX_VALUE;
- List<Committable> poll = pollInputs();
- if (!poll.isEmpty()) {
- committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
- }
- commitUpToCheckpoint(checkpointId);
+ pollInputs();
+ commitUpToCheckpoint(Long.MAX_VALUE);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
- commitUpToCheckpoint(checkpointId);
+ commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
}
private void commitUpToCheckpoint(long checkpointId) throws Exception {
@@ -213,9 +210,27 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
super.close();
}
- private List<Committable> pollInputs() {
- List<Committable> poll = new ArrayList<>(this.inputs);
+ private void pollInputs() throws Exception {
+ Map<Long, List<Committable>> grouped = new HashMap<>();
+ for (Committable c : inputs) {
+ grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList<>()).add(c);
+ }
+
+ for (Map.Entry<Long, List<Committable>> entry : grouped.entrySet()) {
+ Long cp = entry.getKey();
+ List<Committable> committables = entry.getValue();
+ if (committablesPerCheckpoint.containsKey(cp)) {
+ throw new RuntimeException(
+ String.format(
+ "Repeatedly commit the same checkpoint files. \n"
+ + "The previous files is %s, \n"
+ + "and the subsequent files is %s",
+ committablesPerCheckpoint.get(cp), committables));
+ }
+
+ committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
+ }
+
this.inputs.clear();
- return poll;
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
index 052bbdb8..f873c135 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -44,7 +44,7 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
- emitCommittables(false);
+ emitCommittables(false, checkpointId);
}
// no records are expected to emit after endOfInput
}
@@ -52,13 +52,14 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
@Override
public void endInput() throws Exception {
endOfInput = true;
- emitCommittables(true);
+ emitCommittables(true, Long.MAX_VALUE);
}
- private void emitCommittables(boolean endOfInput) throws IOException {
- prepareCommit(endOfInput)
+ private void emitCommittables(boolean endOfInput, long checkpointId) throws IOException {
+ prepareCommit(endOfInput, checkpointId)
.forEach(committable -> output.collect(new StreamRecord<>(committable)));
}
- protected abstract List<Committable> prepareCommit(boolean endOfInput) throws IOException;
+ protected abstract List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+ throws IOException;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index f4329d6b..8b4ded82 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -58,9 +58,10 @@ public class StoreCompactOperator extends PrepareCommitOperator {
}
@Override
- protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
+ protected List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+ throws IOException {
return compact.compact().stream()
- .map(c -> new Committable(Committable.Kind.FILE, c))
+ .map(c -> new Committable(checkpointId, Committable.Kind.FILE, c))
.collect(Collectors.toList());
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index dd332979..89697b07 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -181,11 +181,12 @@ public class StoreWriteOperator extends PrepareCommitOperator {
}
@Override
- protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
+ protected List<Committable> prepareCommit(boolean endOfInput, long checkpointId)
+ throws IOException {
List<Committable> committables = new ArrayList<>();
try {
for (FileCommittable committable : write.prepareCommit(endOfInput)) {
- committables.add(new Committable(Committable.Kind.FILE, committable));
+ committables.add(new Committable(checkpointId, Committable.Kind.FILE, committable));
}
} catch (Exception e) {
throw new IOException(e);
@@ -203,6 +204,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
(k, v) ->
committables.add(
new Committable(
+ checkpointId,
Committable.Kind.LOG_OFFSET,
new LogOffsetCommittable(k, v))));
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 7752421c..a6ba8953 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -44,10 +44,10 @@ public class CommittableSerializerTest {
(FileCommittable)
serializer
.deserialize(
- 1,
+ 2,
serializer.serialize(
new Committable(
- Committable.Kind.FILE, committable)))
+ 9, Committable.Kind.FILE, committable)))
.wrappedCommittable();
assertThat(newCommittable).isEqualTo(committable);
}
@@ -59,10 +59,12 @@ public class CommittableSerializerTest {
(LogOffsetCommittable)
serializer
.deserialize(
- 1,
+ 2,
serializer.serialize(
new Committable(
- Committable.Kind.LOG_OFFSET, committable)))
+ 8,
+ Committable.Kind.LOG_OFFSET,
+ committable)))
.wrappedCommittable();
assertThat(newCommittable).isEqualTo(committable);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
index c9ea2710..e8a2627b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.FileCommittable;
@@ -87,7 +88,7 @@ public class CommitterOperatorTest {
long timestamp = 1;
for (FileCommittable committable : write.prepareCommit(false)) {
testHarness.processElement(
- new Committable(Committable.Kind.FILE, committable), timestamp++);
+ new Committable(8, Committable.Kind.FILE, committable), timestamp++);
}
// checkpoint is completed but not notified, so no snapshot is committed
OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
@@ -116,6 +117,37 @@ public class CommitterOperatorTest {
assertResults(table, "1, 10", "2, 20");
}
+ @Test
+ public void testCheckpointAbort() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
+ createTestHarness(table);
+ testHarness.open();
+
+ // files from multiple checkpoint
+ // but no snapshot
+ long cpId = 0;
+ for (int i = 0; i < 10; i++) {
+ cpId++;
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 10L));
+ write.write(GenericRowData.of(2, 20L));
+ for (FileCommittable committable : write.prepareCommit(false)) {
+ testHarness.processElement(
+ new Committable(cpId, Committable.Kind.FILE, committable), 1);
+ }
+ }
+
+ // checkpoint is completed but not notified, so no snapshot is committed
+ testHarness.snapshot(cpId, 1);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+
+ SnapshotManager snapshotManager = new SnapshotManager(tablePath);
+
+ // should create 10 snapshots
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
+ }
+
private void assertResults(FileStoreTable table, String... expected) {
TableRead read = table.newRead();
List<String> actual = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 2f65e7ef..ea784e97 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -97,4 +97,16 @@ public class Increment {
public int hashCode() {
return Objects.hash(newFiles, compactBefore, compactAfter);
}
+
+ @Override
+ public String toString() {
+ return "Increment{"
+ + "newFiles="
+ + newFiles
+ + ", compactBefore="
+ + compactBefore
+ + ", compactAfter="
+ + compactAfter
+ + '}';
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
index e2a78286..a1da770c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
@@ -68,4 +68,16 @@ public class FileCommittable {
public int hashCode() {
return Objects.hash(partition, bucket, increment);
}
+
+ @Override
+ public String toString() {
+ return "FileCommittable{"
+ + "partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", increment="
+ + increment
+ + '}';
+ }
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 356e97f6..522723b4 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -650,8 +650,11 @@ public class SparkReadITCase {
.isInstanceOf(NamespaceAlreadyExistsException.class)
.hasMessageContaining("Namespace 'bar' already exists");
- assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
- .isEqualTo("[[bar], [default]]");
+ assertThat(
+ spark.sql("SHOW NAMESPACES").collectAsList().stream()
+ .map(row -> row.getString(0))
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("bar", "default");
Path nsPath = new Path(warehousePath, "bar.db");
assertThat(new File(nsPath.toUri())).exists();