You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/24 16:03:14 UTC
[iceberg] branch master updated: Flink: Add option to limit the number of snapshots in a planning operation (#4943)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 95dfb2b6d Flink: Add option to limit the number of snapshots in a planning operation (#4943)
95dfb2b6d is described below
commit 95dfb2b6d2d7566a319fc780debed845b53b3617
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Sat Jun 25 00:03:08 2022 +0800
Flink: Add option to limit the number of snapshots in a planning operation (#4943)
---
.../apache/iceberg/flink/source/FlinkSource.java | 5 ++
.../apache/iceberg/flink/source/ScanContext.java | 25 ++++++-
.../flink/source/StreamingMonitorFunction.java | 29 +++++++-
.../flink/source/TestStreamingMonitorFunction.java | 78 ++++++++++++++++++++++
4 files changed, 133 insertions(+), 4 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 3feafc137..f6cf902b1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -184,6 +184,11 @@ public class FlinkSource {
return this;
}
+ public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
+ contextBuilder.maxPlanningSnapshotCount(newMaxPlanningSnapshotCount);
+ return this;
+ }
+
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 84c7652ef..905205c88 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -82,6 +82,9 @@ public class ScanContext implements Serializable {
private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
+ private static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT =
+ ConfigOptions.key("max-planning-snapshot-count").intType().defaultValue(Integer.MAX_VALUE);
+
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
@@ -102,12 +105,14 @@ public class ScanContext implements Serializable {
private final long limit;
private final boolean includeColumnStats;
private final Integer planParallelism;
+ private final int maxPlanningSnapshotCount;
private ScanContext(boolean caseSensitive, Long snapshotId, StreamingStartingStrategy startingStrategy,
Long startSnapshotTimestamp, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp,
Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming,
Duration monitorInterval, String nameMapping, Schema schema, List<Expression> filters,
- long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism) {
+ long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism,
+ int maxPlanningSnapshotCount) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startingStrategy = startingStrategy;
@@ -128,6 +133,7 @@ public class ScanContext implements Serializable {
this.includeColumnStats = includeColumnStats;
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
+ this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
validate();
}
@@ -225,6 +231,10 @@ public class ScanContext implements Serializable {
return planParallelism;
}
+ public int maxPlanningSnapshotCount() {
+ return maxPlanningSnapshotCount;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -244,6 +254,7 @@ public class ScanContext implements Serializable {
.includeColumnStats(includeColumnStats)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
+ .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.build();
}
@@ -266,6 +277,7 @@ public class ScanContext implements Serializable {
.includeColumnStats(includeColumnStats)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
+ .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.build();
}
@@ -293,6 +305,7 @@ public class ScanContext implements Serializable {
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
private boolean exposeLocality;
private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
+ private int maxPlanningSnapshotCount = MAX_PLANNING_SNAPSHOT_COUNT.defaultValue();
private Builder() {
}
@@ -392,6 +405,11 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
+ this.maxPlanningSnapshotCount = newMaxPlanningSnapshotCount;
+ return this;
+ }
+
public Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
@@ -409,14 +427,15 @@ public class ScanContext implements Serializable {
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
- .includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
+ .includeColumnStats(config.get(INCLUDE_COLUMN_STATS))
+ .maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT));
}
public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startingStrategy, startSnapshotTimestamp,
startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
- filters, limit, includeColumnStats, exposeLocality, planParallelism);
+ filters, limit, includeColumnStats, exposeLocality, planParallelism, maxPlanningSnapshotCount);
}
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index dc499078f..06def6508 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
@@ -82,6 +84,8 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
"Cannot set as-of-timestamp option for streaming reader");
Preconditions.checkArgument(scanContext.endSnapshotId() == null,
"Cannot set end-snapshot-id option for streaming reader");
+ Preconditions.checkArgument(scanContext.maxPlanningSnapshotCount() > 0,
+ "The max-planning-snapshot-count must be greater than zero");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
}
@@ -139,7 +143,24 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
}
}
- private void monitorAndForwardSplits() {
+ private long toSnapshotIdInclusive(long lastConsumedSnapshotId, long currentSnapshotId,
+ int maxPlanningSnapshotCount) {
+ List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, lastConsumedSnapshotId, currentSnapshotId);
+ if (snapshotIds.size() <= maxPlanningSnapshotCount) {
+ return currentSnapshotId;
+ } else {
+ // It uses reverted index since snapshotIdsBetween returns Ids that are ordered by committed time descending.
+ return snapshotIds.get(snapshotIds.size() - maxPlanningSnapshotCount);
+ }
+ }
+
+ @VisibleForTesting
+ void sourceContext(SourceContext<FlinkInputSplit> ctx) {
+ this.sourceContext = ctx;
+ }
+
+ @VisibleForTesting
+ void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
table.refresh();
@@ -151,12 +172,17 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
newScanContext = scanContext.copyWithSnapshotId(snapshotId);
} else {
+ snapshotId = toSnapshotIdInclusive(lastSnapshotId, snapshotId, scanContext.maxPlanningSnapshotCount());
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}
+ LOG.debug("Start discovering splits from {} (exclusive) to {} (inclusive)", lastSnapshotId, snapshotId);
+ long start = System.currentTimeMillis();
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext, workerPool);
+ LOG.debug("Discovered {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);
// only need to hold the checkpoint lock when emitting the splits and updating lastSnapshotId
+ start = System.currentTimeMillis();
synchronized (sourceContext.getCheckpointLock()) {
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
@@ -164,6 +190,7 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
lastSnapshotId = snapshotId;
}
+ LOG.debug("Forwarded {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);
}
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 84fbf42c6..1b1cf70a3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -45,6 +46,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -208,6 +211,81 @@ public class TestStreamingMonitorFunction extends TableTestBase {
}
}
+ @Test
+ public void testInvalidMaxPlanningSnapshotCount() {
+ ScanContext scanContext1 = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .maxPlanningSnapshotCount(0)
+ .build();
+
+ AssertHelpers.assertThrows("Should throw exception because of invalid config",
+ IllegalArgumentException.class, "must be greater than zero",
+ () -> {
+ createFunction(scanContext1);
+ return null;
+ }
+ );
+
+ ScanContext scanContext2 = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .maxPlanningSnapshotCount(-10)
+ .build();
+
+ AssertHelpers.assertThrows("Should throw exception because of invalid config",
+ IllegalArgumentException.class, "must be greater than zero",
+ () -> {
+ createFunction(scanContext2);
+ return null;
+ }
+ );
+ }
+
+ @Test
+ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception {
+ generateRecordsAndCommitTxn(10);
+
+ // Use the oldest snapshot as starting to avoid the initial case.
+ long oldestSnapshotId = SnapshotUtil.oldestAncestor(table).snapshotId();
+
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .splitSize(1000L)
+ .startSnapshotId(oldestSnapshotId)
+ .maxPlanningSnapshotCount(Integer.MAX_VALUE)
+ .build();
+
+ FlinkInputSplit[] expectedSplits = FlinkSplitPlanner
+ .planInputSplits(table, scanContext, ThreadPools.getWorkerPool());
+
+ Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length);
+
+ // This covers three cases that maxPlanningSnapshotCount is less than, equal or greater than the total splits number
+ for (int maxPlanningSnapshotCount : ImmutableList.of(1, 9, 15)) {
+ scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(500))
+ .startSnapshotId(oldestSnapshotId)
+ .splitSize(1000L)
+ .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .build();
+
+ StreamingMonitorFunction function = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ function.sourceContext(sourceContext);
+ function.monitorAndForwardSplits();
+
+ if (maxPlanningSnapshotCount < 10) {
+ Assert.assertEquals("Should produce same splits as max-planning-snapshot-count",
+ maxPlanningSnapshotCount, sourceContext.splits.size());
+ }
+ }
+ }
+ }
+
private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
List<List<Record>> expectedRecords = Lists.newArrayList();
for (int i = 0; i < commitTimes; i++) {