You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/04/12 07:39:56 UTC
[iceberg] branch master updated: Flink: Avoid to commit too
frequently for checkpoints that produce no data (#2042)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e200884 Flink: Avoid to commit too frequently for checkpoints that produce no data (#2042)
e200884 is described below
commit e20088449daec9ed431754044b520b3ac5fa3eaa
Author: wgcn <10...@qq.com>
AuthorDate: Mon Apr 12 15:39:46 2021 +0800
Flink: Avoid to commit too frequently for checkpoints that produce no data (#2042)
---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 26 +++++++++++++++-------
.../flink/sink/TestIcebergFilesCommitter.java | 26 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index c1d3440..ff9174a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the
// iceberg transaction.
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+ static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
// TableLoader to load iceberg table lazily.
private final TableLoader tableLoader;
@@ -95,7 +97,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private transient Table table;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
private transient long maxCommittedCheckpointId;
-
+ private transient int continuousEmptyCheckpoints;
+ private transient int maxContinuousEmptyCommits;
// There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
// same flink job; another case is restoring from snapshot created by another different job. For the second case, we
// need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
@@ -121,6 +124,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
this.tableLoader.open();
this.table = tableLoader.loadTable();
+ maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+ Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+ MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
@@ -157,7 +164,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// Update the checkpoint state.
dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
-
// Reset the snapshot state to the latest state.
checkpointsState.clear();
checkpointsState.add(dataFilesPerCheckpoint);
@@ -189,7 +195,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
String newFlinkJobId,
long checkpointId) throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
-
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
@@ -204,12 +209,17 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
manifests.addAll(deltaManifests.manifests());
}
- if (replacePartitions) {
- replacePartitions(pendingResults, newFlinkJobId, checkpointId);
- } else {
- commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+ int totalFiles = pendingResults.values().stream()
+ .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
+ continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
+ if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
+ if (replacePartitions) {
+ replacePartitions(pendingResults, newFlinkJobId, checkpointId);
+ } else {
+ commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+ }
+ continuousEmptyCheckpoints = 0;
}
-
pendingMap.clear();
// Delete the committed manifests.
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index fb12d91..4ada31a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -67,6 +67,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS;
import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
@RunWith(Parameterized.class)
@@ -110,6 +111,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
table.updateProperties()
.set(DEFAULT_FILE_FORMAT, format.name())
.set(FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath())
+ .set(MAX_CONTINUOUS_EMPTY_COMMITS, "1")
.commit();
}
@@ -141,6 +143,30 @@ public class TestIcebergFilesCommitter extends TableTestBase {
}
}
+ @Test
+ public void testMaxContinuousEmptyCommits() throws Exception {
+ table.updateProperties()
+ .set(MAX_CONTINUOUS_EMPTY_COMMITS, "3")
+ .commit();
+
+ JobID jobId = new JobID();
+ long checkpointId = 0;
+ long timestamp = 0;
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(0);
+
+ for (int i = 1; i <= 9; i++) {
+ harness.snapshot(++checkpointId, ++timestamp);
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+
+ assertSnapshotSize(i / 3);
+ }
+ }
+ }
+
private WriteResult of(DataFile dataFile) {
return WriteResult.builder().addDataFiles(dataFile).build();
}