You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/12/08 21:03:07 UTC

[iceberg] branch master updated: Flink: Support read options in flink source (#5967)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0694269cd2 Flink: Support read options in flink source (#5967)
0694269cd2 is described below

commit 0694269cd20a3625d5587d544bec621e9bec305a
Author: liliwei <hi...@gmail.com>
AuthorDate: Fri Dec 9 05:03:00 2022 +0800

    Flink: Support read options in flink source (#5967)
---
 docs/flink-getting-started.md                      |  53 ++++++-
 .../org/apache/iceberg/flink/FlinkConfParser.java  |  35 ++++-
 .../org/apache/iceberg/flink/FlinkReadConf.java    | 168 +++++++++++++++++++++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |  96 ++++++++++++
 .../apache/iceberg/flink/source/FlinkSource.java   |  51 +++++--
 .../iceberg/flink/source/FlinkSplitPlanner.java    |  18 +--
 .../apache/iceberg/flink/source/IcebergSource.java |  87 ++++++++---
 .../apache/iceberg/flink/source/ScanContext.java   | 128 ++++++----------
 .../flink/source/TestFlinkSourceConfig.java        |  62 ++++++++
 9 files changed, 570 insertions(+), 128 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 7fdc8fc58a..9af84fadf3 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -684,7 +684,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like this:
+
+```
+IcebergSource.forRowData()
+    .tableLoader(TableLoader.fromCatalog(...))
+    .assignerFactory(new SimpleSplitAssignerFactory())
+    .streaming(true)
+    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+    .startSnapshotId(3821550127947089987L)
+    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
+    .build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+    .getConfiguration()
+    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
+
+| Read option                 | Flink configuration                           | Table property               | Default                          | Description                                                  |
+| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
+| snapshot-id                 | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the specified snapshot-id. |
+| case-sensitive              | connector.iceberg.case-sensitive              | N/A                          | false                            | If true, match column name in a case sensitive way.          |
+| as-of-timestamp             | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
+| starting-strategy           | connector.iceberg.starting-strategy           | N/A                          | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots shou [...]
+| start-snapshot-timestamp    | N/A                                           | N/A                          | null                             | Start to read data from the most recent snapshot as of the given time in milliseconds. |
+| start-snapshot-id           | N/A                                           | N/A                          | null                             | Start to read data from the specified snapshot-id.           |
+| end-snapshot-id             | N/A                                           | N/A                          | The latest snapshot id           | Specifies the end snapshot.                                  |
+| split-size                  | connector.iceberg.split-size                  | read.split.target-size       | 128 MB                           | Target size when combining input splits.                     |
+| split-lookback              | connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 10                               | Number of bins to consider when combining input splits.      |
+| split-file-open-cost        | connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 4MB                              | The estimated cost to open a file, used as a minimum weight when combining splits. |
+| streaming                   | connector.iceberg.streaming                   | N/A                          | false                            | Sets whether the current task runs in streaming or batch mode. |
+| monitor-interval            | connector.iceberg.monitor-interval            | N/A                          | 60s                              | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
+| include-column-stats        | connector.iceberg.include-column-stats        | N/A                          | false                            | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
+| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A                          | Integer.MAX_VALUE                | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
+| limit                       | connector.iceberg.limit                       | N/A                          | -1                               | Limited output number of rows.                               |
+
+
+### Write options
 
 Flink write options are passed when configuring the FlinkSink, like this:
 
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 83fa09de54..5543341be9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 class FlinkConfParser {
 
@@ -34,7 +35,7 @@ class FlinkConfParser {
   private final ReadableConfig readableConfig;
 
   FlinkConfParser(Table table, Map<String, String> options, ReadableConfig readableConfig) {
-    this.tableProperties = table.properties();
+    this.tableProperties = table == null ? Maps.newHashMap() : table.properties();
     this.options = options;
     this.readableConfig = readableConfig;
   }
@@ -51,6 +52,10 @@ class FlinkConfParser {
     return new LongConfParser();
   }
 
+  public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
+    return new EnumConfParser<>(enumClass);
+  }
+
   public StringConfParser stringConf() {
     return new StringConfParser();
   }
@@ -148,6 +153,34 @@ class FlinkConfParser {
     }
   }
 
+  class EnumConfParser<E extends Enum<E>> extends ConfParser<EnumConfParser<E>, E> {
+    private E defaultValue;
+    private final Class<E> enumClass;
+
+    EnumConfParser(Class<E> enumClass) {
+      this.enumClass = enumClass;
+    }
+
+    @Override
+    protected EnumConfParser<E> self() {
+      return this;
+    }
+
+    public EnumConfParser<E> defaultValue(E value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public E parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(s -> Enum.valueOf(enumClass, s), defaultValue);
+    }
+
+    public E parseOptional() {
+      return parse(s -> Enum.valueOf(enumClass, s), null);
+    }
+  }
+
   abstract class ConfParser<ThisT, T> {
     private final List<String> optionNames = Lists.newArrayList();
     private String tablePropertyName;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
new file mode 100644
index 0000000000..e2cacc2adf
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.TimeUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+
+public class FlinkReadConf {
+
+  private final FlinkConfParser confParser;
+
+  public FlinkReadConf(
+      Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
+    this.confParser = new FlinkConfParser(table, readOptions, readableConfig);
+  }
+
+  public Long snapshotId() {
+    return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
+  }
+
+  public boolean caseSensitive() {
+    return confParser
+        .booleanConf()
+        .option(FlinkReadOptions.CASE_SENSITIVE)
+        .flinkConfig(FlinkReadOptions.CASE_SENSITIVE_OPTION)
+        .defaultValue(FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Long asOfTimestamp() {
+    return confParser.longConf().option(FlinkReadOptions.AS_OF_TIMESTAMP.key()).parseOptional();
+  }
+
+  public StreamingStartingStrategy startingStrategy() {
+    return confParser
+        .enumConfParser(StreamingStartingStrategy.class)
+        .option(FlinkReadOptions.STARTING_STRATEGY)
+        .flinkConfig(FlinkReadOptions.STARTING_STRATEGY_OPTION)
+        .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+        .parse();
+  }
+
+  public Long startSnapshotTimestamp() {
+    return confParser
+        .longConf()
+        .option(FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key())
+        .parseOptional();
+  }
+
+  public Long startSnapshotId() {
+    return confParser.longConf().option(FlinkReadOptions.START_SNAPSHOT_ID.key()).parseOptional();
+  }
+
+  public Long endSnapshotId() {
+    return confParser.longConf().option(FlinkReadOptions.END_SNAPSHOT_ID.key()).parseOptional();
+  }
+
+  public long splitSize() {
+    return confParser
+        .longConf()
+        .option(FlinkReadOptions.SPLIT_SIZE)
+        .flinkConfig(FlinkReadOptions.SPLIT_SIZE_OPTION)
+        .tableProperty(TableProperties.SPLIT_SIZE)
+        .defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
+        .parse();
+  }
+
+  public int splitLookback() {
+    return confParser
+        .intConf()
+        .option(FlinkReadOptions.SPLIT_LOOKBACK)
+        .flinkConfig(FlinkReadOptions.SPLIT_LOOKBACK_OPTION)
+        .tableProperty(TableProperties.SPLIT_LOOKBACK)
+        .defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT)
+        .parse();
+  }
+
+  public long splitFileOpenCost() {
+    return confParser
+        .longConf()
+        .option(FlinkReadOptions.SPLIT_FILE_OPEN_COST)
+        .flinkConfig(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION)
+        .tableProperty(TableProperties.SPLIT_OPEN_FILE_COST)
+        .defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)
+        .parse();
+  }
+
+  public boolean streaming() {
+    return confParser
+        .booleanConf()
+        .option(FlinkReadOptions.STREAMING)
+        .flinkConfig(FlinkReadOptions.STREAMING_OPTION)
+        .defaultValue(FlinkReadOptions.STREAMING_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Duration monitorInterval() {
+    String duration =
+        confParser
+            .stringConf()
+            .option(FlinkReadOptions.MONITOR_INTERVAL)
+            .flinkConfig(FlinkReadOptions.MONITOR_INTERVAL_OPTION)
+            .defaultValue(FlinkReadOptions.MONITOR_INTERVAL_OPTION.defaultValue())
+            .parse();
+
+    return TimeUtils.parseDuration(duration);
+  }
+
+  public boolean includeColumnStats() {
+    return confParser
+        .booleanConf()
+        .option(FlinkReadOptions.INCLUDE_COLUMN_STATS)
+        .flinkConfig(FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION)
+        .defaultValue(FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue())
+        .parse();
+  }
+
+  public int maxPlanningSnapshotCount() {
+    return confParser
+        .intConf()
+        .option(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT)
+        .flinkConfig(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION)
+        .defaultValue(FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue())
+        .parse();
+  }
+
+  public String nameMapping() {
+    return confParser.stringConf().option(TableProperties.DEFAULT_NAME_MAPPING).parseOptional();
+  }
+
+  public long limit() {
+    return confParser
+        .longConf()
+        .option(FlinkReadOptions.LIMIT)
+        .flinkConfig(FlinkReadOptions.LIMIT_OPTION)
+        .defaultValue(FlinkReadOptions.LIMIT_OPTION.defaultValue())
+        .parse();
+  }
+
+  public int workerPoolSize() {
+    return confParser
+        .intConf()
+        .flinkConfig(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)
+        .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
+        .parse();
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
new file mode 100644
index 0000000000..54f64dbfa8
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+
+/** Flink source read options */
+public class FlinkReadOptions {
+  private static final String PREFIX = "connector.iceberg.";
+
+  private FlinkReadOptions() {}
+
+  public static final ConfigOption<Long> SNAPSHOT_ID =
+      ConfigOptions.key("snapshot-id").longType().defaultValue(null);
+
+  public static final String CASE_SENSITIVE = "case-sensitive";
+  public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
+      ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
+
+  public static final ConfigOption<Long> AS_OF_TIMESTAMP =
+      ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
+
+  public static final String STARTING_STRATEGY = "starting-strategy";
+  public static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY_OPTION =
+      ConfigOptions.key(PREFIX + STARTING_STRATEGY)
+          .enumType(StreamingStartingStrategy.class)
+          .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
+
+  public static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
+      ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
+
+  public static final ConfigOption<Long> START_SNAPSHOT_ID =
+      ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
+
+  public static final ConfigOption<Long> END_SNAPSHOT_ID =
+      ConfigOptions.key("end-snapshot-id").longType().defaultValue(null);
+
+  public static final String SPLIT_SIZE = "split-size";
+  public static final ConfigOption<Long> SPLIT_SIZE_OPTION =
+      ConfigOptions.key(PREFIX + SPLIT_SIZE)
+          .longType()
+          .defaultValue(TableProperties.SPLIT_SIZE_DEFAULT);
+
+  public static final String SPLIT_LOOKBACK = "split-lookback";
+  public static final ConfigOption<Integer> SPLIT_LOOKBACK_OPTION =
+      ConfigOptions.key(PREFIX + SPLIT_LOOKBACK)
+          .intType()
+          .defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT);
+
+  public static final String SPLIT_FILE_OPEN_COST = "split-file-open-cost";
+  public static final ConfigOption<Long> SPLIT_FILE_OPEN_COST_OPTION =
+      ConfigOptions.key(PREFIX + SPLIT_FILE_OPEN_COST)
+          .longType()
+          .defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+  public static final String STREAMING = "streaming";
+  public static final ConfigOption<Boolean> STREAMING_OPTION =
+      ConfigOptions.key(PREFIX + STREAMING).booleanType().defaultValue(false);
+
+  public static final String MONITOR_INTERVAL = "monitor-interval";
+  public static final ConfigOption<String> MONITOR_INTERVAL_OPTION =
+      ConfigOptions.key(PREFIX + MONITOR_INTERVAL).stringType().defaultValue("60s");
+
+  public static final String INCLUDE_COLUMN_STATS = "include-column-stats";
+  public static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS_OPTION =
+      ConfigOptions.key(PREFIX + INCLUDE_COLUMN_STATS).booleanType().defaultValue(false);
+
+  public static final String MAX_PLANNING_SNAPSHOT_COUNT = "max-planning-snapshot-count";
+  public static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT_OPTION =
+      ConfigOptions.key(PREFIX + MAX_PLANNING_SNAPSHOT_COUNT)
+          .intType()
+          .defaultValue(Integer.MAX_VALUE);
+
+  public static final String LIMIT = "limit";
+  public static final ConfigOption<Long> LIMIT_OPTION =
+      ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index cb1c5b0889..84a96d4dc7 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.Duration;
@@ -36,11 +38,14 @@ import org.apache.iceberg.TableScan;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +85,8 @@ public class FlinkSource {
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
     private Boolean exposeLocality;
 
+    private final Map<String, String> readOptions = Maps.newHashMap();
+
     public Builder tableLoader(TableLoader newLoader) {
       this.tableLoader = newLoader;
       return this;
@@ -110,53 +117,65 @@ public class FlinkSource {
       return this;
     }
 
+    public Builder set(String property, String value) {
+      readOptions.put(property, value);
+      return this;
+    }
+
+    public Builder setAll(Map<String, String> properties) {
+      readOptions.putAll(properties);
+      return this;
+    }
+
+    /** @deprecated Use {@link #setAll} instead. */
+    @Deprecated
     public Builder properties(Map<String, String> properties) {
-      contextBuilder.fromProperties(properties);
+      readOptions.putAll(properties);
       return this;
     }
 
     public Builder caseSensitive(boolean caseSensitive) {
-      contextBuilder.caseSensitive(caseSensitive);
+      readOptions.put(FlinkReadOptions.CASE_SENSITIVE, Boolean.toString(caseSensitive));
       return this;
     }
 
     public Builder snapshotId(Long snapshotId) {
-      contextBuilder.useSnapshotId(snapshotId);
+      readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(snapshotId));
       return this;
     }
 
     public Builder startSnapshotId(Long startSnapshotId) {
-      contextBuilder.startSnapshotId(startSnapshotId);
+      readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
       return this;
     }
 
     public Builder endSnapshotId(Long endSnapshotId) {
-      contextBuilder.endSnapshotId(endSnapshotId);
+      readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(endSnapshotId));
       return this;
     }
 
     public Builder asOfTimestamp(Long asOfTimestamp) {
-      contextBuilder.asOfTimestamp(asOfTimestamp);
+      readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
       return this;
     }
 
     public Builder splitSize(Long splitSize) {
-      contextBuilder.splitSize(splitSize);
+      readOptions.put(FlinkReadOptions.SPLIT_SIZE, Long.toString(splitSize));
       return this;
     }
 
     public Builder splitLookback(Integer splitLookback) {
-      contextBuilder.splitLookback(splitLookback);
+      readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, Integer.toString(splitLookback));
       return this;
     }
 
     public Builder splitOpenFileCost(Long splitOpenFileCost) {
-      contextBuilder.splitOpenFileCost(splitOpenFileCost);
+      readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, Long.toString(splitOpenFileCost));
       return this;
     }
 
     public Builder streaming(boolean streaming) {
-      contextBuilder.streaming(streaming);
+      readOptions.put(FlinkReadOptions.STREAMING, Boolean.toString(streaming));
       return this;
     }
 
@@ -166,17 +185,19 @@ public class FlinkSource {
     }
 
     public Builder nameMapping(String nameMapping) {
-      contextBuilder.nameMapping(nameMapping);
+      readOptions.put(DEFAULT_NAME_MAPPING, nameMapping);
       return this;
     }
 
     public Builder monitorInterval(Duration interval) {
-      contextBuilder.monitorInterval(interval);
+      readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, interval.toNanos() + " ns");
       return this;
     }
 
     public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
-      contextBuilder.maxPlanningSnapshotCount(newMaxPlanningSnapshotCount);
+      readOptions.put(
+          FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT,
+          Integer.toString(newMaxPlanningSnapshotCount));
       return this;
     }
 
@@ -219,6 +240,8 @@ public class FlinkSource {
       contextBuilder.planParallelism(
           readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
 
+      contextBuilder.resolveConfig(table, readOptions, readableConfig);
+
       return new FlinkInputFormat(
           tableLoader, icebergSchema, io, encryption, contextBuilder.build());
     }
@@ -261,6 +284,6 @@ public class FlinkSource {
   }
 
   public static boolean isBounded(Map<String, String> properties) {
-    return !ScanContext.builder().fromProperties(properties).build().isStreaming();
+    return !PropertyUtil.propertyAsBoolean(properties, FlinkReadOptions.STREAMING, false);
   }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 4746625310..3ff349dd8b 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -136,20 +136,14 @@ public class FlinkSplitPlanner {
       refinedScan = refinedScan.includeColumnStats();
     }
 
-    if (context.splitSize() != null) {
-      refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
-    }
+    refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
 
-    if (context.splitLookback() != null) {
-      refinedScan =
-          refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
-    }
+    refinedScan =
+        refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
 
-    if (context.splitOpenFileCost() != null) {
-      refinedScan =
-          refinedScan.option(
-              TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
-    }
+    refinedScan =
+        refinedScan.option(
+            TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
 
     if (context.filters() != null) {
       for (Expression filter : context.filters()) {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 9728aeb2b3..0f50d866d9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -41,7 +41,10 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
@@ -58,6 +61,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,6 +216,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
     private TableSchema projectedFlinkSchema;
     private Boolean exposeLocality;
 
+    private final Map<String, String> readOptions = Maps.newHashMap();
+
     Builder() {}
 
     public Builder<T> tableLoader(TableLoader loader) {
@@ -235,67 +241,89 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> caseSensitive(boolean newCaseSensitive) {
-      this.contextBuilder.caseSensitive(newCaseSensitive);
+      readOptions.put(FlinkReadOptions.CASE_SENSITIVE, Boolean.toString(newCaseSensitive));
       return this;
     }
 
     public Builder<T> useSnapshotId(Long newSnapshotId) {
-      this.contextBuilder.useSnapshotId(newSnapshotId);
+      if (newSnapshotId != null) {
+        readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(newSnapshotId));
+      }
       return this;
     }
 
     public Builder<T> streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
-      this.contextBuilder.startingStrategy(newStartingStrategy);
+      readOptions.put(FlinkReadOptions.STARTING_STRATEGY, newStartingStrategy.name());
       return this;
     }
 
     public Builder<T> startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
-      this.contextBuilder.startSnapshotTimestamp(newStartSnapshotTimestamp);
+      if (newStartSnapshotTimestamp != null) {
+        readOptions.put(
+            FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(),
+            Long.toString(newStartSnapshotTimestamp));
+      }
       return this;
     }
 
     public Builder<T> startSnapshotId(Long newStartSnapshotId) {
-      this.contextBuilder.startSnapshotId(newStartSnapshotId);
+      if (newStartSnapshotId != null) {
+        readOptions.put(
+            FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(newStartSnapshotId));
+      }
       return this;
     }
 
     public Builder<T> endSnapshotId(Long newEndSnapshotId) {
-      this.contextBuilder.endSnapshotId(newEndSnapshotId);
+      if (newEndSnapshotId != null) {
+        readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
+      }
       return this;
     }
 
     public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
-      this.contextBuilder.asOfTimestamp(newAsOfTimestamp);
+      if (newAsOfTimestamp != null) {
+        readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(newAsOfTimestamp));
+      }
       return this;
     }
 
     public Builder<T> splitSize(Long newSplitSize) {
-      this.contextBuilder.splitSize(newSplitSize);
+      if (newSplitSize != null) {
+        readOptions.put(FlinkReadOptions.SPLIT_SIZE, Long.toString(newSplitSize));
+      }
       return this;
     }
 
     public Builder<T> splitLookback(Integer newSplitLookback) {
-      this.contextBuilder.splitLookback(newSplitLookback);
+      if (newSplitLookback != null) {
+        readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, Integer.toString(newSplitLookback));
+      }
       return this;
     }
 
     public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
-      this.contextBuilder.splitOpenFileCost(newSplitOpenFileCost);
+      if (newSplitOpenFileCost != null) {
+        readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, Long.toString(newSplitOpenFileCost));
+      }
+
       return this;
     }
 
     public Builder<T> streaming(boolean streaming) {
-      this.contextBuilder.streaming(streaming);
+      readOptions.put(FlinkReadOptions.STREAMING, Boolean.toString(streaming));
       return this;
     }
 
     public Builder<T> monitorInterval(Duration newMonitorInterval) {
-      this.contextBuilder.monitorInterval(newMonitorInterval);
+      if (newMonitorInterval != null) {
+        readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, newMonitorInterval.toNanos() + " ns");
+      }
       return this;
     }
 
     public Builder<T> nameMapping(String newNameMapping) {
-      this.contextBuilder.nameMapping(newNameMapping);
+      readOptions.put(TableProperties.DEFAULT_NAME_MAPPING, newNameMapping);
       return this;
     }
 
@@ -315,17 +343,20 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> limit(long newLimit) {
-      this.contextBuilder.limit(newLimit);
+      readOptions.put(FlinkReadOptions.LIMIT, Long.toString(newLimit));
       return this;
     }
 
     public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
-      this.contextBuilder.includeColumnStats(newIncludeColumnStats);
+      readOptions.put(
+          FlinkReadOptions.INCLUDE_COLUMN_STATS, Boolean.toString(newIncludeColumnStats));
       return this;
     }
 
     public Builder<T> planParallelism(int planParallelism) {
-      this.contextBuilder.planParallelism(planParallelism);
+      readOptions.put(
+          FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(),
+          Integer.toString(planParallelism));
       return this;
     }
 
@@ -334,8 +365,28 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    /**
+     * Set the read properties for Flink source. View the supported properties in {@link
+     * FlinkReadOptions}
+     */
+    public Builder<T> set(String property, String value) {
+      readOptions.put(property, value);
+      return this;
+    }
+
+    /**
+     * Set the read properties for Flink source. View the supported properties in {@link
+     * FlinkReadOptions}
+     */
+    public Builder<T> setAll(Map<String, String> properties) {
+      readOptions.putAll(properties);
+      return this;
+    }
+
+    /** @deprecated Use {@link #setAll} instead. */
+    @Deprecated
     public Builder<T> properties(Map<String, String> properties) {
-      contextBuilder.fromProperties(properties);
+      readOptions.putAll(properties);
       return this;
     }
 
@@ -348,6 +399,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
         throw new UncheckedIOException(e);
       }
 
+      contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+
       Schema icebergSchema = table.schema();
       if (projectedFlinkSchema != null) {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 1d99e441b4..02c4943fe9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -18,20 +18,20 @@
  */
 package org.apache.iceberg.flink.source;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TimeUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadConf;
+import org.apache.iceberg.flink.FlinkReadOptions;
 
 /** Context object with optional arguments for a Flink Scan. */
 @Internal
@@ -39,50 +39,6 @@ public class ScanContext implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private static final ConfigOption<Long> SNAPSHOT_ID =
-      ConfigOptions.key("snapshot-id").longType().defaultValue(null);
-
-  private static final ConfigOption<Boolean> CASE_SENSITIVE =
-      ConfigOptions.key("case-sensitive").booleanType().defaultValue(false);
-
-  private static final ConfigOption<Long> AS_OF_TIMESTAMP =
-      ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
-
-  private static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY =
-      ConfigOptions.key("starting-strategy")
-          .enumType(StreamingStartingStrategy.class)
-          .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
-
-  private static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
-      ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
-
-  private static final ConfigOption<Long> START_SNAPSHOT_ID =
-      ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
-
-  private static final ConfigOption<Long> END_SNAPSHOT_ID =
-      ConfigOptions.key("end-snapshot-id").longType().defaultValue(null);
-
-  private static final ConfigOption<Long> SPLIT_SIZE =
-      ConfigOptions.key("split-size").longType().defaultValue(null);
-
-  private static final ConfigOption<Integer> SPLIT_LOOKBACK =
-      ConfigOptions.key("split-lookback").intType().defaultValue(null);
-
-  private static final ConfigOption<Long> SPLIT_FILE_OPEN_COST =
-      ConfigOptions.key("split-file-open-cost").longType().defaultValue(null);
-
-  private static final ConfigOption<Boolean> STREAMING =
-      ConfigOptions.key("streaming").booleanType().defaultValue(false);
-
-  private static final ConfigOption<Duration> MONITOR_INTERVAL =
-      ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
-
-  private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
-      ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
-
-  private static final ConfigOption<Integer> MAX_PLANNING_SNAPSHOT_COUNT =
-      ConfigOptions.key("max-planning-snapshot-count").intType().defaultValue(Integer.MAX_VALUE);
-
   private final boolean caseSensitive;
   private final boolean exposeLocality;
   private final Long snapshotId;
@@ -303,27 +259,31 @@ public class ScanContext implements Serializable {
   }
 
   public static class Builder {
-    private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
-    private Long snapshotId = SNAPSHOT_ID.defaultValue();
-    private StreamingStartingStrategy startingStrategy = STARTING_STRATEGY.defaultValue();
-    private Long startSnapshotTimestamp = START_SNAPSHOT_TIMESTAMP.defaultValue();
-    private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
-    private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
-    private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
-    private Long splitSize = SPLIT_SIZE.defaultValue();
-    private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
-    private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
-    private boolean isStreaming = STREAMING.defaultValue();
-    private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
+    private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
+    private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
+    private StreamingStartingStrategy startingStrategy =
+        FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
+    private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
+    private Long startSnapshotId = FlinkReadOptions.START_SNAPSHOT_ID.defaultValue();
+    private Long endSnapshotId = FlinkReadOptions.END_SNAPSHOT_ID.defaultValue();
+    private Long asOfTimestamp = FlinkReadOptions.AS_OF_TIMESTAMP.defaultValue();
+    private Long splitSize = FlinkReadOptions.SPLIT_SIZE_OPTION.defaultValue();
+    private Integer splitLookback = FlinkReadOptions.SPLIT_LOOKBACK_OPTION.defaultValue();
+    private Long splitOpenFileCost = FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION.defaultValue();
+    private boolean isStreaming = FlinkReadOptions.STREAMING_OPTION.defaultValue();
+    private Duration monitorInterval =
+        TimeUtils.parseDuration(FlinkReadOptions.MONITOR_INTERVAL_OPTION.defaultValue());
     private String nameMapping;
     private Schema projectedSchema;
     private List<Expression> filters;
-    private long limit = -1L;
-    private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
+    private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
+    private boolean includeColumnStats =
+        FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
     private boolean exposeLocality;
     private Integer planParallelism =
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
-    private int maxPlanningSnapshotCount = MAX_PLANNING_SNAPSHOT_COUNT.defaultValue();
+    private int maxPlanningSnapshotCount =
+        FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -427,25 +387,27 @@ public class ScanContext implements Serializable {
       return this;
     }
 
-    public Builder fromProperties(Map<String, String> properties) {
-      Configuration config = new Configuration();
-      properties.forEach(config::setString);
-
-      return this.useSnapshotId(config.get(SNAPSHOT_ID))
-          .caseSensitive(config.get(CASE_SENSITIVE))
-          .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
-          .startingStrategy(config.get(STARTING_STRATEGY))
-          .startSnapshotTimestamp(config.get(START_SNAPSHOT_TIMESTAMP))
-          .startSnapshotId(config.get(START_SNAPSHOT_ID))
-          .endSnapshotId(config.get(END_SNAPSHOT_ID))
-          .splitSize(config.get(SPLIT_SIZE))
-          .splitLookback(config.get(SPLIT_LOOKBACK))
-          .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
-          .streaming(config.get(STREAMING))
-          .monitorInterval(config.get(MONITOR_INTERVAL))
-          .nameMapping(properties.get(DEFAULT_NAME_MAPPING))
-          .includeColumnStats(config.get(INCLUDE_COLUMN_STATS))
-          .maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT));
+    public Builder resolveConfig(
+        Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
+      FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
+
+      return this.useSnapshotId(flinkReadConf.snapshotId())
+          .caseSensitive(flinkReadConf.caseSensitive())
+          .asOfTimestamp(flinkReadConf.asOfTimestamp())
+          .startingStrategy(flinkReadConf.startingStrategy())
+          .startSnapshotTimestamp(flinkReadConf.startSnapshotTimestamp())
+          .startSnapshotId(flinkReadConf.startSnapshotId())
+          .endSnapshotId(flinkReadConf.endSnapshotId())
+          .splitSize(flinkReadConf.splitSize())
+          .splitLookback(flinkReadConf.splitLookback())
+          .splitOpenFileCost(flinkReadConf.splitFileOpenCost())
+          .streaming(flinkReadConf.streaming())
+          .monitorInterval(flinkReadConf.monitorInterval())
+          .nameMapping(flinkReadConf.nameMapping())
+          .limit(flinkReadConf.limit())
+          .planParallelism(flinkReadConf.workerPoolSize())
+          .includeColumnStats(flinkReadConf.includeColumnStats())
+          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
     }
 
     public ScanContext build() {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
new file mode 100644
index 0000000000..974b8539b3
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.flink.FlinkReadOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkSourceConfig extends TestFlinkTableSource {
+  private static final String TABLE = "test_table";
+
+  @Test
+  public void testFlinkSessionConfig() {
+    getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true);
+    AssertHelpers.assertThrows(
+        "Should throw exception because of cannot set snapshot-id option for streaming reader",
+        IllegalArgumentException.class,
+        "Cannot set as-of-timestamp option for streaming reader",
+        () -> {
+          sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
+          return null;
+        });
+  }
+
+  @Test
+  public void testFlinkHintConfig() {
+    List<Row> result =
+        sql(
+            "SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='%d','streaming'='false')*/",
+            TABLE, System.currentTimeMillis());
+    Assert.assertEquals(3, result.size());
+  }
+
+  @Test
+  public void testReadOptionHierarchy() {
+    getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L);
+    List<Row> result = sql("SELECT * FROM %s", TABLE);
+    Assert.assertEquals(1, result.size());
+
+    result = sql("SELECT * FROM %s /*+ OPTIONS('limit'='3')*/", TABLE);
+    Assert.assertEquals(3, result.size());
+  }
+}