You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/24 16:03:14 UTC

[iceberg] branch master updated: Flink: Add option to limit the number of snapshots in a planning operation (#4943)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 95dfb2b6d Flink: Add option to limit the number of snapshots in a planning operation (#4943)
95dfb2b6d is described below

commit 95dfb2b6d2d7566a319fc780debed845b53b3617
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Sat Jun 25 00:03:08 2022 +0800

    Flink: Add option to limit the number of snapshots in a planning operation (#4943)
---
 .../apache/iceberg/flink/source/FlinkSource.java   |  5 ++
 .../apache/iceberg/flink/source/ScanContext.java   | 25 ++++++-
 .../flink/source/StreamingMonitorFunction.java     | 29 +++++++-
 .../flink/source/TestStreamingMonitorFunction.java | 78 ++++++++++++++++++++++
 4 files changed, 133 insertions(+), 4 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 3feafc137..f6cf902b1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -184,6 +184,11 @@ public class FlinkSource {
       return this;
     }
 
+    public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
+      contextBuilder.maxPlanningSnapshotCount(newMaxPlanningSnapshotCount);
+      return this;
+    }
+
     public Builder flinkConf(ReadableConfig config) {
       this.readableConfig = config;
       return this;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 84c7652ef..905205c88 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -82,6 +82,9 @@ public class ScanContext implements Serializable {
   private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
       ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
 
+  private static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT =
+      ConfigOptions.key("max-planning-snapshot-count").intType().defaultValue(Integer.MAX_VALUE);
+
   private final boolean caseSensitive;
   private final boolean exposeLocality;
   private final Long snapshotId;
@@ -102,12 +105,14 @@ public class ScanContext implements Serializable {
   private final long limit;
   private final boolean includeColumnStats;
   private final Integer planParallelism;
+  private final int maxPlanningSnapshotCount;
 
   private ScanContext(boolean caseSensitive, Long snapshotId, StreamingStartingStrategy startingStrategy,
                       Long startSnapshotTimestamp, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp,
                       Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming,
                       Duration monitorInterval, String nameMapping, Schema schema, List<Expression> filters,
-                      long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism) {
+                      long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism,
+                      int maxPlanningSnapshotCount) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
     this.startingStrategy = startingStrategy;
@@ -128,6 +133,7 @@ public class ScanContext implements Serializable {
     this.includeColumnStats = includeColumnStats;
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
+    this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
 
     validate();
   }
@@ -225,6 +231,10 @@ public class ScanContext implements Serializable {
     return planParallelism;
   }
 
+  public int maxPlanningSnapshotCount() {
+    return maxPlanningSnapshotCount;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -244,6 +254,7 @@ public class ScanContext implements Serializable {
         .includeColumnStats(includeColumnStats)
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
+        .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .build();
   }
 
@@ -266,6 +277,7 @@ public class ScanContext implements Serializable {
         .includeColumnStats(includeColumnStats)
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
+        .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .build();
   }
 
@@ -293,6 +305,7 @@ public class ScanContext implements Serializable {
     private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
     private boolean exposeLocality;
     private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
+    private int maxPlanningSnapshotCount = MAX_PLANNING_SNAPSHOT_COUNT.defaultValue();
 
     private Builder() {
     }
@@ -392,6 +405,11 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
+      this.maxPlanningSnapshotCount = newMaxPlanningSnapshotCount;
+      return this;
+    }
+
     public Builder fromProperties(Map<String, String> properties) {
       Configuration config = new Configuration();
       properties.forEach(config::setString);
@@ -409,14 +427,15 @@ public class ScanContext implements Serializable {
           .streaming(config.get(STREAMING))
           .monitorInterval(config.get(MONITOR_INTERVAL))
           .nameMapping(properties.get(DEFAULT_NAME_MAPPING))
-          .includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
+          .includeColumnStats(config.get(INCLUDE_COLUMN_STATS))
+          .maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT));
     }
 
     public ScanContext build() {
       return new ScanContext(caseSensitive, snapshotId, startingStrategy, startSnapshotTimestamp,
           startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
           splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
-          filters, limit, includeColumnStats, exposeLocality, planParallelism);
+          filters, limit, includeColumnStats, exposeLocality, planParallelism, maxPlanningSnapshotCount);
     }
   }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index dc499078f..06def6508 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
@@ -82,6 +84,8 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
         "Cannot set as-of-timestamp option for streaming reader");
     Preconditions.checkArgument(scanContext.endSnapshotId() == null,
         "Cannot set end-snapshot-id option for streaming reader");
+    Preconditions.checkArgument(scanContext.maxPlanningSnapshotCount() > 0,
+        "The max-planning-snapshot-count must be greater than zero");
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
   }
@@ -139,7 +143,24 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
     }
   }
 
-  private void monitorAndForwardSplits() {
+  private long toSnapshotIdInclusive(long lastConsumedSnapshotId, long currentSnapshotId,
+                                     int maxPlanningSnapshotCount) {
+    List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, lastConsumedSnapshotId, currentSnapshotId);
+    if (snapshotIds.size() <= maxPlanningSnapshotCount) {
+      return currentSnapshotId;
+    } else {
+      // It uses reverted index since snapshotIdsBetween returns Ids that are ordered by committed time descending.
+      return snapshotIds.get(snapshotIds.size() - maxPlanningSnapshotCount);
+    }
+  }
+
+  @VisibleForTesting
+  void sourceContext(SourceContext<FlinkInputSplit> ctx) {
+    this.sourceContext = ctx;
+  }
+
+  @VisibleForTesting
+  void monitorAndForwardSplits() {
     // Refresh the table to get the latest committed snapshot.
     table.refresh();
 
@@ -151,12 +172,17 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
       if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
         newScanContext = scanContext.copyWithSnapshotId(snapshotId);
       } else {
+        snapshotId = toSnapshotIdInclusive(lastSnapshotId, snapshotId, scanContext.maxPlanningSnapshotCount());
         newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
       }
 
+      LOG.debug("Start discovering splits from {} (exclusive) to {} (inclusive)", lastSnapshotId, snapshotId);
+      long start = System.currentTimeMillis();
       FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext, workerPool);
+      LOG.debug("Discovered {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);
 
       // only need to hold the checkpoint lock when emitting the splits and updating lastSnapshotId
+      start = System.currentTimeMillis();
       synchronized (sourceContext.getCheckpointLock()) {
         for (FlinkInputSplit split : splits) {
           sourceContext.collect(split);
@@ -164,6 +190,7 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
 
         lastSnapshotId = snapshotId;
       }
+      LOG.debug("Forwarded {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);
     }
   }
 
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 84fbf42c6..1b1cf70a3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -45,6 +46,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -208,6 +211,81 @@ public class TestStreamingMonitorFunction extends TableTestBase {
     }
   }
 
+  @Test
+  public void testInvalidMaxPlanningSnapshotCount() {
+    ScanContext scanContext1 = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .maxPlanningSnapshotCount(0)
+        .build();
+
+    AssertHelpers.assertThrows("Should throw exception because of invalid config",
+        IllegalArgumentException.class, "must be greater than zero",
+        () -> {
+          createFunction(scanContext1);
+          return null;
+        }
+    );
+
+    ScanContext scanContext2 = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .maxPlanningSnapshotCount(-10)
+        .build();
+
+    AssertHelpers.assertThrows("Should throw exception because of invalid config",
+        IllegalArgumentException.class, "must be greater than zero",
+        () -> {
+          createFunction(scanContext2);
+          return null;
+        }
+    );
+  }
+
+  @Test
+  public void testConsumeWithMaxPlanningSnapshotCount() throws Exception {
+    generateRecordsAndCommitTxn(10);
+
+    // Use the oldest snapshot as starting to avoid the initial case.
+    long oldestSnapshotId = SnapshotUtil.oldestAncestor(table).snapshotId();
+
+    ScanContext scanContext = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .splitSize(1000L)
+        .startSnapshotId(oldestSnapshotId)
+        .maxPlanningSnapshotCount(Integer.MAX_VALUE)
+        .build();
+
+    FlinkInputSplit[] expectedSplits = FlinkSplitPlanner
+        .planInputSplits(table, scanContext, ThreadPools.getWorkerPool());
+
+    Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length);
+
+    // This covers three cases that maxPlanningSnapshotCount is less than, equal or greater than the total splits number
+    for (int maxPlanningSnapshotCount : ImmutableList.of(1, 9, 15)) {
+      scanContext = ScanContext.builder()
+          .monitorInterval(Duration.ofMillis(500))
+          .startSnapshotId(oldestSnapshotId)
+          .splitSize(1000L)
+          .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+          .build();
+
+      StreamingMonitorFunction function = createFunction(scanContext);
+      try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+        harness.setup();
+        harness.open();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        TestSourceContext sourceContext = new TestSourceContext(latch);
+        function.sourceContext(sourceContext);
+        function.monitorAndForwardSplits();
+
+        if (maxPlanningSnapshotCount < 10) {
+          Assert.assertEquals("Should produce same splits as max-planning-snapshot-count",
+              maxPlanningSnapshotCount, sourceContext.splits.size());
+        }
+      }
+    }
+  }
+
   private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
     List<List<Record>> expectedRecords = Lists.newArrayList();
     for (int i = 0; i < commitTimes; i++) {