You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/12/08 21:03:07 UTC
[iceberg] branch master updated: Flink: Support read options in flink source (#5967)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0694269cd2 Flink: Support read options in flink source (#5967)
0694269cd2 is described below
commit 0694269cd20a3625d5587d544bec621e9bec305a
Author: liliwei <hi...@gmail.com>
AuthorDate: Fri Dec 9 05:03:00 2022 +0800
Flink: Support read options in flink source (#5967)
---
docs/flink-getting-started.md | 53 ++++++-
.../org/apache/iceberg/flink/FlinkConfParser.java | 35 ++++-
.../org/apache/iceberg/flink/FlinkReadConf.java | 168 +++++++++++++++++++++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 96 ++++++++++++
.../apache/iceberg/flink/source/FlinkSource.java | 51 +++++--
.../iceberg/flink/source/FlinkSplitPlanner.java | 18 +--
.../apache/iceberg/flink/source/IcebergSource.java | 87 ++++++++---
.../apache/iceberg/flink/source/ScanContext.java | 128 ++++++----------
.../flink/source/TestFlinkSourceConfig.java | 62 ++++++++
9 files changed, 570 insertions(+), 128 deletions(-)
diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 7fdc8fc58a..9af84fadf3 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -684,7 +684,58 @@ env.execute("Test Iceberg DataStream");
OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
{{< /hint >}}
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like this:
+
+```
+IcebergSource.forRowData()
+ .tableLoader(TableLoader.fromCatalog(...))
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .streaming(true)
+ .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .startSnapshotId(3821550127947089987L)
+ .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
+ .build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+ .getConfiguration()
+ .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
+
+| Read option | Flink configuration | Table property | Default | Description |
+| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
+| snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
+| case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
+| as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
+| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots shou [...]
+| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
+| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
+| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
+| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
+| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
+| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
+| streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
+| monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
+| include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
+| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
+| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
+
+
+### Write options
Flink write options are passed when configuring the FlinkSink, like this:
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 83fa09de54..5543341be9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
class FlinkConfParser {
@@ -34,7 +35,7 @@ class FlinkConfParser {
private final ReadableConfig readableConfig;
FlinkConfParser(Table table, Map<String, String> options, ReadableConfig readableConfig) {
- this.tableProperties = table.properties();
+ this.tableProperties = table == null ? Maps.newHashMap() : table.properties();
this.options = options;
this.readableConfig = readableConfig;
}
@@ -51,6 +52,10 @@ class FlinkConfParser {
return new LongConfParser();
}
+ public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
+ return new EnumConfParser<>(enumClass);
+ }
+
public StringConfParser stringConf() {
return new StringConfParser();
}
@@ -148,6 +153,34 @@ class FlinkConfParser {
}
}
+ class EnumConfParser<E extends Enum<E>> extends ConfParser<EnumConfParser<E>, E> {
+ private E defaultValue;
+ private final Class<E> enumClass;
+
+ EnumConfParser(Class<E> enumClass) {
+ this.enumClass = enumClass;
+ }
+
+ @Override
+ protected EnumConfParser<E> self() {
+ return this;
+ }
+
+ public EnumConfParser<E> defaultValue(E value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public E parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+ return parse(s -> Enum.valueOf(enumClass, s), defaultValue);
+ }
+
+ public E parseOptional() {
+ return parse(s -> Enum.valueOf(enumClass, s), null);
+ }
+ }
+
abstract class ConfParser<ThisT, T> {
private final List<String> optionNames = Lists.newArrayList();
private String tablePropertyName;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
new file mode 100644
index 0000000000..e2cacc2adf
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.TimeUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+
+public class FlinkReadConf {
+
+ private final FlinkConfParser confParser;
+
+ public FlinkReadConf(
+ Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
+ this.confParser = new FlinkConfParser(table, readOptions, readableConfig);
+ }
+
+ public Long snapshotId() {
+ return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
+ }
+
+ public boolean caseSensitive() {
+ return confParser
+ .booleanConf()
+ .option(FlinkReadOptions.CASE_SENSITIVE)
+ .flinkConfig(FlinkReadOptions.CASE_SENSITIVE_OPTION)
+ .defaultValue(FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Long asOfTimestamp() {
+ return confParser.longConf().option(FlinkReadOptions.AS_OF_TIMESTAMP.key()).parseOptional();
+ }
+
+ public StreamingStartingStrategy startingStrategy() {
+ return confParser
+ .enumConfParser(StreamingStartingStrategy.class)
+ .option(FlinkReadOptions.STARTING_STRATEGY)
+ .flinkConfig(FlinkReadOptions.STARTING_STRATEGY_OPTION)
+ .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .parse();
+ }
+
+ public Long startSnapshotTimestamp() {
+ return confParser
+ .longConf()
+ .option(FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key())
+ .parseOptional();
+ }
+
+ public Long startSnapshotId() {
+ return confParser.longConf().option(FlinkReadOptions.START_SNAPSHOT_ID.key()).parseOptional();
+ }
+
+ public Long endSnapshotId() {
+ return confParser.longConf().option(FlinkReadOptions.END_SNAPSHOT_ID.key()).parseOptional();
+ }
+
+ public long splitSize() {
+ return confParser
+ .longConf()
+ .option(FlinkReadOptions.SPLIT_SIZE)
+ .flinkConfig(FlinkReadOptions.SPLIT_SIZE_OPTION)
+ .tableProperty(TableProperties.SPLIT_SIZE)
+ .defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
+ .parse();
+ }
+
+ public int splitLookback() {
+ return confParser
+ .intConf()
+ .option(FlinkReadOptions.SPLIT_LOOKBACK)
+ .flinkConfig(FlinkReadOptions.SPLIT_LOOKBACK_OPTION)
+ .tableProperty(TableProperties.SPLIT_LOOKBACK)
+ .defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT)
+ .parse();
+ }
+
+ public long splitFileOpenCost() {
+ return confParser
+ .longConf()
+ .option(FlinkReadOptions.SPLIT_FILE_OPEN_COST)
+ .flinkConfig(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION)
+ .tableProperty(TableProperties.SPLIT_OPEN_FILE_COST)
+ .defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)
+ .parse();
+ }
+
+ public boolean streaming() {
+ return confParser
+ .booleanConf()
+ .option(FlinkReadOptions.STREAMING)
+ .flinkConfig(FlinkReadOptions.STREAMING_OPTION)
+ .defaultValue(FlinkReadOptions.STREAMING_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Duration monitorInterval() {
+ String duration =
+ confParser
+ .stringConf()
+ .option(FlinkReadOptions.MONITOR_INTERVAL)
+ .flinkConfig(FlinkReadOptions.MONITOR_INTERVAL_OPTION)
+ .defaultValue(FlinkReadOptions.MONITOR_INTERVAL_OPTION.defaultValue())
+ .parse();
+
+ return TimeUtils.parseDuration(duration);
+ }
+
+ public boolean includeColumnStats() {
+ return confParser
+ .booleanConf()
+ .option(FlinkReadOptions.INCLUDE_COLUMN_STATS)
+ .flinkConfig(FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION)
+ .defaultValue(FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue())
+ .parse();
+ }
+
+ public int maxPlanningSnapshotCount() {
+ return confParser
+ .intConf()
+ .option(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT)
+ .flinkConfig(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION)
+ .defaultValue(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue())
+ .parse();
+ }
+
+ public String nameMapping() {
+ return confParser.stringConf().option(TableProperties.DEFAULT_NAME_MAPPING).parseOptional();
+ }
+
+ public long limit() {
+ return confParser
+ .longConf()
+ .option(FlinkReadOptions.LIMIT)
+ .flinkConfig(FlinkReadOptions.LIMIT_OPTION)
+ .defaultValue(FlinkReadOptions.LIMIT_OPTION.defaultValue())
+ .parse();
+ }
+
+ public int workerPoolSize() {
+ return confParser
+ .intConf()
+ .flinkConfig(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)
+ .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
+ .parse();
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
new file mode 100644
index 0000000000..54f64dbfa8
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+
+/** Flink source read options */
+public class FlinkReadOptions {
+ private static final String PREFIX = "connector.iceberg.";
+
+ private FlinkReadOptions() {}
+
+ public static final ConfigOption<Long> SNAPSHOT_ID =
+ ConfigOptions.key("snapshot-id").longType().defaultValue(null);
+
+ public static final String CASE_SENSITIVE = "case-sensitive";
+ public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
+ ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
+
+ public static final ConfigOption<Long> AS_OF_TIMESTAMP =
+ ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
+
+ public static final String STARTING_STRATEGY = "starting-strategy";
+ public static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY_OPTION =
+ ConfigOptions.key(PREFIX + STARTING_STRATEGY)
+ .enumType(StreamingStartingStrategy.class)
+ .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
+
+ public static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
+ ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
+
+ public static final ConfigOption<Long> START_SNAPSHOT_ID =
+ ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
+
+ public static final ConfigOption<Long> END_SNAPSHOT_ID =
+ ConfigOptions.key("end-snapshot-id").longType().defaultValue(null);
+
+ public static final String SPLIT_SIZE = "split-size";
+ public static final ConfigOption<Long> SPLIT_SIZE_OPTION =
+ ConfigOptions.key(PREFIX + SPLIT_SIZE)
+ .longType()
+ .defaultValue(TableProperties.SPLIT_SIZE_DEFAULT);
+
+ public static final String SPLIT_LOOKBACK = "split-lookback";
+ public static final ConfigOption<Integer> SPLIT_LOOKBACK_OPTION =
+ ConfigOptions.key(PREFIX + SPLIT_LOOKBACK)
+ .intType()
+ .defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT);
+
+ public static final String SPLIT_FILE_OPEN_COST = "split-file-open-cost";
+ public static final ConfigOption<Long> SPLIT_FILE_OPEN_COST_OPTION =
+ ConfigOptions.key(PREFIX + SPLIT_FILE_OPEN_COST)
+ .longType()
+ .defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+ public static final String STREAMING = "streaming";
+ public static final ConfigOption<Boolean> STREAMING_OPTION =
+ ConfigOptions.key(PREFIX + STREAMING).booleanType().defaultValue(false);
+
+ public static final String MONITOR_INTERVAL = "monitor-interval";
+ public static final ConfigOption<String> MONITOR_INTERVAL_OPTION =
+ ConfigOptions.key(PREFIX + MONITOR_INTERVAL).stringType().defaultValue("60s");
+
+ public static final String INCLUDE_COLUMN_STATS = "include-column-stats";
+ public static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS_OPTION =
+ ConfigOptions.key(PREFIX + INCLUDE_COLUMN_STATS).booleanType().defaultValue(false);
+
+ public static final String MAX_PLANNING_SNAPSHOT_COUNT = "max-planning-snapshot-count";
+ public static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT_OPTION =
+ ConfigOptions.key(PREFIX + MAX_PLANNING_SNAPSHOT_COUNT)
+ .intType()
+ .defaultValue(Integer.MAX_VALUE);
+
+ public static final String LIMIT = "limit";
+ public static final ConfigOption<Long> LIMIT_OPTION =
+ ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index cb1c5b0889..84a96d4dc7 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
@@ -36,11 +38,14 @@ import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +85,8 @@ public class FlinkSource {
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private Boolean exposeLocality;
+ private final Map<String, String> readOptions = Maps.newHashMap();
+
public Builder tableLoader(TableLoader newLoader) {
this.tableLoader = newLoader;
return this;
@@ -110,53 +117,65 @@ public class FlinkSource {
return this;
}
+ public Builder set(String property, String value) {
+ readOptions.put(property, value);
+ return this;
+ }
+
+ public Builder setAll(Map<String, String> properties) {
+ readOptions.putAll(properties);
+ return this;
+ }
+
+ /** @deprecated Use {@link #setAll} instead. */
+ @Deprecated
public Builder properties(Map<String, String> properties) {
- contextBuilder.fromProperties(properties);
+ readOptions.putAll(properties);
return this;
}
public Builder caseSensitive(boolean caseSensitive) {
- contextBuilder.caseSensitive(caseSensitive);
+ readOptions.put(FlinkReadOptions.CASE_SENSITIVE, Boolean.toString(caseSensitive));
return this;
}
public Builder snapshotId(Long snapshotId) {
- contextBuilder.useSnapshotId(snapshotId);
+ readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(snapshotId));
return this;
}
public Builder startSnapshotId(Long startSnapshotId) {
- contextBuilder.startSnapshotId(startSnapshotId);
+ readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
return this;
}
public Builder endSnapshotId(Long endSnapshotId) {
- contextBuilder.endSnapshotId(endSnapshotId);
+ readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(endSnapshotId));
return this;
}
public Builder asOfTimestamp(Long asOfTimestamp) {
- contextBuilder.asOfTimestamp(asOfTimestamp);
+ readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
return this;
}
public Builder splitSize(Long splitSize) {
- contextBuilder.splitSize(splitSize);
+ readOptions.put(FlinkReadOptions.SPLIT_SIZE, Long.toString(splitSize));
return this;
}
public Builder splitLookback(Integer splitLookback) {
- contextBuilder.splitLookback(splitLookback);
+ readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, Integer.toString(splitLookback));
return this;
}
public Builder splitOpenFileCost(Long splitOpenFileCost) {
- contextBuilder.splitOpenFileCost(splitOpenFileCost);
+ readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, Long.toString(splitOpenFileCost));
return this;
}
public Builder streaming(boolean streaming) {
- contextBuilder.streaming(streaming);
+ readOptions.put(FlinkReadOptions.STREAMING, Boolean.toString(streaming));
return this;
}
@@ -166,17 +185,19 @@ public class FlinkSource {
}
public Builder nameMapping(String nameMapping) {
- contextBuilder.nameMapping(nameMapping);
+ readOptions.put(DEFAULT_NAME_MAPPING, nameMapping);
return this;
}
public Builder monitorInterval(Duration interval) {
- contextBuilder.monitorInterval(interval);
+ readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, interval.toNanos() + " ns");
return this;
}
public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
- contextBuilder.maxPlanningSnapshotCount(newMaxPlanningSnapshotCount);
+ readOptions.put(
+ FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT,
+ Integer.toString(newMaxPlanningSnapshotCount));
return this;
}
@@ -219,6 +240,8 @@ public class FlinkSource {
contextBuilder.planParallelism(
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
+ contextBuilder.resolveConfig(table, readOptions, readableConfig);
+
return new FlinkInputFormat(
tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}
@@ -261,6 +284,6 @@ public class FlinkSource {
}
public static boolean isBounded(Map<String, String> properties) {
- return !ScanContext.builder().fromProperties(properties).build().isStreaming();
+ return !PropertyUtil.propertyAsBoolean(properties, FlinkReadOptions.STREAMING, false);
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 4746625310..3ff349dd8b 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -136,20 +136,14 @@ public class FlinkSplitPlanner {
refinedScan = refinedScan.includeColumnStats();
}
- if (context.splitSize() != null) {
- refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
- }
+ refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
- if (context.splitLookback() != null) {
- refinedScan =
- refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
- }
+ refinedScan =
+ refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
- if (context.splitOpenFileCost() != null) {
- refinedScan =
- refinedScan.option(
- TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
- }
+ refinedScan =
+ refinedScan.option(
+ TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
if (context.filters() != null) {
for (Expression filter : context.filters()) {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 9728aeb2b3..0f50d866d9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -41,7 +41,10 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
@@ -58,6 +61,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -212,6 +216,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private TableSchema projectedFlinkSchema;
private Boolean exposeLocality;
+ private final Map<String, String> readOptions = Maps.newHashMap();
+
Builder() {}
public Builder<T> tableLoader(TableLoader loader) {
@@ -235,67 +241,89 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
}
public Builder<T> caseSensitive(boolean newCaseSensitive) {
- this.contextBuilder.caseSensitive(newCaseSensitive);
+ readOptions.put(FlinkReadOptions.CASE_SENSITIVE, Boolean.toString(newCaseSensitive));
return this;
}
public Builder<T> useSnapshotId(Long newSnapshotId) {
- this.contextBuilder.useSnapshotId(newSnapshotId);
+ if (newSnapshotId != null) {
+ readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(newSnapshotId));
+ }
return this;
}
public Builder<T> streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
- this.contextBuilder.startingStrategy(newStartingStrategy);
+ readOptions.put(FlinkReadOptions.STARTING_STRATEGY, newStartingStrategy.name());
return this;
}
public Builder<T> startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
- this.contextBuilder.startSnapshotTimestamp(newStartSnapshotTimestamp);
+ if (newStartSnapshotTimestamp != null) {
+ readOptions.put(
+ FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(),
+ Long.toString(newStartSnapshotTimestamp));
+ }
return this;
}
public Builder<T> startSnapshotId(Long newStartSnapshotId) {
- this.contextBuilder.startSnapshotId(newStartSnapshotId);
+ if (newStartSnapshotId != null) {
+ readOptions.put(
+ FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(newStartSnapshotId));
+ }
return this;
}
public Builder<T> endSnapshotId(Long newEndSnapshotId) {
- this.contextBuilder.endSnapshotId(newEndSnapshotId);
+ if (newEndSnapshotId != null) {
+ readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
+ }
return this;
}
public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
- this.contextBuilder.asOfTimestamp(newAsOfTimestamp);
+ if (newAsOfTimestamp != null) {
+ readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(newAsOfTimestamp));
+ }
return this;
}
public Builder<T> splitSize(Long newSplitSize) {
- this.contextBuilder.splitSize(newSplitSize);
+ if (newSplitSize != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_SIZE, Long.toString(newSplitSize));
+ }
return this;
}
public Builder<T> splitLookback(Integer newSplitLookback) {
- this.contextBuilder.splitLookback(newSplitLookback);
+ if (newSplitLookback != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, Integer.toString(newSplitLookback));
+ }
return this;
}
public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
- this.contextBuilder.splitOpenFileCost(newSplitOpenFileCost);
+ if (newSplitOpenFileCost != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, Long.toString(newSplitOpenFileCost));
+ }
+
return this;
}
public Builder<T> streaming(boolean streaming) {
- this.contextBuilder.streaming(streaming);
+ readOptions.put(FlinkReadOptions.STREAMING, Boolean.toString(streaming));
return this;
}
public Builder<T> monitorInterval(Duration newMonitorInterval) {
- this.contextBuilder.monitorInterval(newMonitorInterval);
+ if (newMonitorInterval != null) {
+ readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, newMonitorInterval.toNanos() + " ns");
+ }
return this;
}
public Builder<T> nameMapping(String newNameMapping) {
- this.contextBuilder.nameMapping(newNameMapping);
+ readOptions.put(TableProperties.DEFAULT_NAME_MAPPING, newNameMapping);
return this;
}
@@ -315,17 +343,20 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
}
public Builder<T> limit(long newLimit) {
- this.contextBuilder.limit(newLimit);
+ readOptions.put(FlinkReadOptions.LIMIT, Long.toString(newLimit));
return this;
}
public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
- this.contextBuilder.includeColumnStats(newIncludeColumnStats);
+ readOptions.put(
+ FlinkReadOptions.INCLUDE_COLUMN_STATS, Boolean.toString(newIncludeColumnStats));
return this;
}
public Builder<T> planParallelism(int planParallelism) {
- this.contextBuilder.planParallelism(planParallelism);
+ readOptions.put(
+ FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(),
+ Integer.toString(planParallelism));
return this;
}
@@ -334,8 +365,28 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
return this;
}
+ /**
+ * Set the read properties for Flink source. View the supported properties in {@link
+ * FlinkReadOptions}
+ */
+ public Builder<T> set(String property, String value) {
+ readOptions.put(property, value);
+ return this;
+ }
+
+ /**
+ * Set the read properties for Flink source. View the supported properties in {@link
+ * FlinkReadOptions}
+ */
+ public Builder<T> setAll(Map<String, String> properties) {
+ readOptions.putAll(properties);
+ return this;
+ }
+
+ /** @deprecated Use {@link #setAll} instead. */
+ @Deprecated
public Builder<T> properties(Map<String, String> properties) {
- contextBuilder.fromProperties(properties);
+ readOptions.putAll(properties);
return this;
}
@@ -348,6 +399,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
throw new UncheckedIOException(e);
}
+ contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 1d99e441b4..02c4943fe9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -18,20 +18,20 @@
*/
package org.apache.iceberg.flink.source;
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
+import org.apache.iceberg.flink.FlinkReadOptions;
/** Context object with optional arguments for a Flink Scan. */
@Internal
@@ -39,50 +39,6 @@ public class ScanContext implements Serializable {
private static final long serialVersionUID = 1L;
- private static final ConfigOption<Long> SNAPSHOT_ID =
- ConfigOptions.key("snapshot-id").longType().defaultValue(null);
-
- private static final ConfigOption<Boolean> CASE_SENSITIVE =
- ConfigOptions.key("case-sensitive").booleanType().defaultValue(false);
-
- private static final ConfigOption<Long> AS_OF_TIMESTAMP =
- ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
-
- private static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY =
- ConfigOptions.key("starting-strategy")
- .enumType(StreamingStartingStrategy.class)
- .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
-
- private static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
- ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
-
- private static final ConfigOption<Long> START_SNAPSHOT_ID =
- ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
-
- private static final ConfigOption<Long> END_SNAPSHOT_ID =
- ConfigOptions.key("end-snapshot-id").longType().defaultValue(null);
-
- private static final ConfigOption<Long> SPLIT_SIZE =
- ConfigOptions.key("split-size").longType().defaultValue(null);
-
- private static final ConfigOption<Integer> SPLIT_LOOKBACK =
- ConfigOptions.key("split-lookback").intType().defaultValue(null);
-
- private static final ConfigOption<Long> SPLIT_FILE_OPEN_COST =
- ConfigOptions.key("split-file-open-cost").longType().defaultValue(null);
-
- private static final ConfigOption<Boolean> STREAMING =
- ConfigOptions.key("streaming").booleanType().defaultValue(false);
-
- private static final ConfigOption<Duration> MONITOR_INTERVAL =
- ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
-
- private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
- ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
-
- private static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT =
- ConfigOptions.key("max-planning-snapshot-count").intType().defaultValue(Integer.MAX_VALUE);
-
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
@@ -303,27 +259,31 @@ public class ScanContext implements Serializable {
}
public static class Builder {
- private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
- private Long snapshotId = SNAPSHOT_ID.defaultValue();
- private StreamingStartingStrategy startingStrategy = STARTING_STRATEGY.defaultValue();
- private Long startSnapshotTimestamp = START_SNAPSHOT_TIMESTAMP.defaultValue();
- private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
- private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
- private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
- private Long splitSize = SPLIT_SIZE.defaultValue();
- private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
- private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
- private boolean isStreaming = STREAMING.defaultValue();
- private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
+ private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
+ private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
+ private StreamingStartingStrategy startingStrategy =
+ FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
+ private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
+ private Long startSnapshotId = FlinkReadOptions.START_SNAPSHOT_ID.defaultValue();
+ private Long endSnapshotId = FlinkReadOptions.END_SNAPSHOT_ID.defaultValue();
+ private Long asOfTimestamp = FlinkReadOptions.AS_OF_TIMESTAMP.defaultValue();
+ private Long splitSize = FlinkReadOptions.SPLIT_SIZE_OPTION.defaultValue();
+ private Integer splitLookback = FlinkReadOptions.SPLIT_LOOKBACK_OPTION.defaultValue();
+ private Long splitOpenFileCost = FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION.defaultValue();
+ private boolean isStreaming = FlinkReadOptions.STREAMING_OPTION.defaultValue();
+ private Duration monitorInterval =
+ TimeUtils.parseDuration(FlinkReadOptions.MONITOR_INTERVAL_OPTION.defaultValue());
private String nameMapping;
private Schema projectedSchema;
private List<Expression> filters;
- private long limit = -1L;
- private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
+ private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
+ private boolean includeColumnStats =
+ FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
private boolean exposeLocality;
private Integer planParallelism =
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
- private int maxPlanningSnapshotCount = MAX_PLANNING_SNAPSHOT_COUNT.defaultValue();
+ private int maxPlanningSnapshotCount =
+ FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private Builder() {}
@@ -427,25 +387,27 @@ public class ScanContext implements Serializable {
return this;
}
- public Builder fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
-
- return this.useSnapshotId(config.get(SNAPSHOT_ID))
- .caseSensitive(config.get(CASE_SENSITIVE))
- .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
- .startingStrategy(config.get(STARTING_STRATEGY))
- .startSnapshotTimestamp(config.get(START_SNAPSHOT_TIMESTAMP))
- .startSnapshotId(config.get(START_SNAPSHOT_ID))
- .endSnapshotId(config.get(END_SNAPSHOT_ID))
- .splitSize(config.get(SPLIT_SIZE))
- .splitLookback(config.get(SPLIT_LOOKBACK))
- .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
- .streaming(config.get(STREAMING))
- .monitorInterval(config.get(MONITOR_INTERVAL))
- .nameMapping(properties.get(DEFAULT_NAME_MAPPING))
- .includeColumnStats(config.get(INCLUDE_COLUMN_STATS))
- .maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT));
+ public Builder resolveConfig(
+ Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
+ FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
+
+ return this.useSnapshotId(flinkReadConf.snapshotId())
+ .caseSensitive(flinkReadConf.caseSensitive())
+ .asOfTimestamp(flinkReadConf.asOfTimestamp())
+ .startingStrategy(flinkReadConf.startingStrategy())
+ .startSnapshotTimestamp(flinkReadConf.startSnapshotTimestamp())
+ .startSnapshotId(flinkReadConf.startSnapshotId())
+ .endSnapshotId(flinkReadConf.endSnapshotId())
+ .splitSize(flinkReadConf.splitSize())
+ .splitLookback(flinkReadConf.splitLookback())
+ .splitOpenFileCost(flinkReadConf.splitFileOpenCost())
+ .streaming(flinkReadConf.streaming())
+ .monitorInterval(flinkReadConf.monitorInterval())
+ .nameMapping(flinkReadConf.nameMapping())
+ .limit(flinkReadConf.limit())
+ .planParallelism(flinkReadConf.workerPoolSize())
+ .includeColumnStats(flinkReadConf.includeColumnStats())
+ .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
}
public ScanContext build() {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
new file mode 100644
index 0000000000..974b8539b3
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.flink.FlinkReadOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkSourceConfig extends TestFlinkTableSource {
+ private static final String TABLE = "test_table";
+
+ @Test
+ public void testFlinkSessionConfig() {
+ getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true);
+ AssertHelpers.assertThrows(
+ "Should throw exception because of cannot set snapshot-id option for streaming reader",
+ IllegalArgumentException.class,
+ "Cannot set as-of-timestamp option for streaming reader",
+ () -> {
+ sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
+ return null;
+ });
+ }
+
+ @Test
+ public void testFlinkHintConfig() {
+ List<Row> result =
+ sql(
+ "SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='%d','streaming'='false')*/",
+ TABLE, System.currentTimeMillis());
+ Assert.assertEquals(3, result.size());
+ }
+
+ @Test
+ public void testReadOptionHierarchy() {
+ getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L);
+ List<Row> result = sql("SELECT * FROM %s", TABLE);
+ Assert.assertEquals(1, result.size());
+
+ result = sql("SELECT * FROM %s /*+ OPTIONS('limit'='3')*/", TABLE);
+ Assert.assertEquals(3, result.size());
+ }
+}