You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/04/12 07:39:56 UTC

[iceberg] branch master updated: Flink: Avoid to commit too frequently for checkpoints that produce no data (#2042)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e200884  Flink:  Avoid to commit too frequently for checkpoints that produce no data (#2042)
e200884 is described below

commit e20088449daec9ed431754044b520b3ac5fa3eaa
Author: wgcn <10...@qq.com>
AuthorDate: Mon Apr 12 15:39:46 2021 +0800

    Flink:  Avoid to commit too frequently for checkpoints that produce no data (#2042)
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java  | 26 +++++++++++++++-------
 .../flink/sink/TestIcebergFilesCommitter.java      | 26 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index c1d3440..ff9174a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +74,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   // avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the
   // iceberg transaction.
   private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+  static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
 
   // TableLoader to load iceberg table lazily.
   private final TableLoader tableLoader;
@@ -95,7 +97,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   private transient Table table;
   private transient ManifestOutputFileFactory manifestOutputFileFactory;
   private transient long maxCommittedCheckpointId;
-
+  private transient int continuousEmptyCheckpoints;
+  private transient int maxContinuousEmptyCommits;
   // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
   // same flink job; another case is restoring from snapshot created by another different job. For the second case, we
   // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
@@ -121,6 +124,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     this.tableLoader.open();
     this.table = tableLoader.loadTable();
 
+    maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+    Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+        MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
     int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     int attemptId = getRuntimeContext().getAttemptNumber();
     this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
@@ -157,7 +164,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
 
     // Update the checkpoint state.
     dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
-
     // Reset the snapshot state to the latest state.
     checkpointsState.clear();
     checkpointsState.add(dataFilesPerCheckpoint);
@@ -189,7 +195,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
                                     String newFlinkJobId,
                                     long checkpointId) throws IOException {
     NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
-
     List<ManifestFile> manifests = Lists.newArrayList();
     NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
     for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
@@ -204,12 +209,17 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       manifests.addAll(deltaManifests.manifests());
     }
 
-    if (replacePartitions) {
-      replacePartitions(pendingResults, newFlinkJobId, checkpointId);
-    } else {
-      commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+    int totalFiles = pendingResults.values().stream()
+        .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
+    continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
+    if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
+      if (replacePartitions) {
+        replacePartitions(pendingResults, newFlinkJobId, checkpointId);
+      } else {
+        commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+      }
+      continuousEmptyCheckpoints = 0;
     }
-
     pendingMap.clear();
 
     // Delete the committed manifests.
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index fb12d91..4ada31a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -67,6 +67,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS;
 import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
 
 @RunWith(Parameterized.class)
@@ -110,6 +111,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     table.updateProperties()
         .set(DEFAULT_FILE_FORMAT, format.name())
         .set(FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath())
+        .set(MAX_CONTINUOUS_EMPTY_COMMITS, "1")
         .commit();
   }
 
@@ -141,6 +143,30 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     }
   }
 
+  @Test
+  public void testMaxContinuousEmptyCommits() throws Exception {
+    table.updateProperties()
+        .set(MAX_CONTINUOUS_EMPTY_COMMITS, "3")
+        .commit();
+
+    JobID jobId = new JobID();
+    long checkpointId = 0;
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(0);
+
+      for (int i = 1; i <= 9; i++) {
+        harness.snapshot(++checkpointId, ++timestamp);
+        harness.notifyOfCompletedCheckpoint(checkpointId);
+
+        assertSnapshotSize(i / 3);
+      }
+    }
+  }
+
   private WriteResult of(DataFile dataFile) {
     return WriteResult.builder().addDataFiles(dataFile).build();
   }