You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/28 12:17:08 UTC

[hudi] branch master updated: [HUDI-2371] Improvement flink streaming reader (#3552)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57668d0  [HUDI-2371] Improvement flink streaming reader (#3552)
57668d0 is described below

commit 57668d02a0aa723dd4b2245dc7659fe18113eb59
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Aug 28 20:16:54 2021 +0800

    [HUDI-2371] Improvement flink streaming reader (#3552)
    
    - Support reading empty table
    - Fix filtering by partition path
    - Support reading from earliest commit
---
 .../apache/hudi/configuration/FlinkOptions.java    |   1 +
 .../hudi/source/StreamReadMonitoringFunction.java  |  44 +++++-
 .../org/apache/hudi/table/HoodieTableSource.java   | 153 +++++++++++++++------
 .../apache/hudi/table/format/FilePathUtils.java    |  30 +++-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  45 +++++-
 .../source/TestStreamReadMonitoringFunction.java   |  31 +++++
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  75 +++++++++-
 .../apache/hudi/table/TestHoodieTableSource.java   |  36 ++++-
 .../test/java/org/apache/hudi/utils/TestUtils.java |   5 +-
 9 files changed, 347 insertions(+), 73 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a1c4417..0e2b0b3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -196,6 +196,7 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(60)// default 1 minute
       .withDescription("Check interval for streaming read of SECOND, default 1 minute");
 
+  public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
       .key("read.streaming.start-commit")
       .stringType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 92c06e9..c7dcc0a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -105,20 +107,23 @@ public class StreamReadMonitoringFunction
 
   private transient org.apache.hadoop.conf.Configuration hadoopConf;
 
-  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetaClient metaClient;
 
   private final long maxCompactionMemoryInBytes;
 
+  // for partition pruning
+  private final Set<String> requiredPartitionPaths;
+
   public StreamReadMonitoringFunction(
       Configuration conf,
       Path path,
-      HoodieTableMetaClient metaClient,
-      long maxCompactionMemoryInBytes) {
+      long maxCompactionMemoryInBytes,
+      Set<String> requiredPartitionPaths) {
     this.conf = conf;
     this.path = path;
-    this.metaClient = metaClient;
     this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+    this.requiredPartitionPaths = requiredPartitionPaths;
   }
 
   @Override
@@ -180,8 +185,26 @@ public class StreamReadMonitoringFunction
     }
   }
 
+  @Nullable
+  private HoodieTableMetaClient getOrCreateMetaClient() {
+    if (this.metaClient != null) {
+      return this.metaClient;
+    }
+    if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
+      this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), hadoopConf);
+      return this.metaClient;
+    }
+    // fallback
+    return null;
+  }
+
   @VisibleForTesting
   public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
+    HoodieTableMetaClient metaClient = getOrCreateMetaClient();
+    if (metaClient == null) {
+      // table does not exist
+      return;
+    }
     metaClient.reloadActiveTimeline();
     HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
     if (commitTimeline.empty()) {
@@ -200,8 +223,9 @@ public class StreamReadMonitoringFunction
       } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
         // first time consume and has a start commit
         final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
-        instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(),
-            InstantRange.RangeType.CLOSE_CLOSE);
+        instantRange = specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
+            ? null
+            : InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
       } else {
         // first time consume and no start commit, consumes the latest incremental data set.
         HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
@@ -222,6 +246,11 @@ public class StreamReadMonitoringFunction
     List<HoodieCommitMetadata> metadataList = instants.stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
     Set<String> writePartitions = getWritePartitionPaths(metadataList);
+    // apply partition push down
+    if (this.requiredPartitionPaths.size() > 0) {
+      writePartitions = writePartitions.stream()
+          .filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
+    }
     FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
     if (fileStatuses.length == 0) {
       LOG.warn("No files found for reading in user provided path.");
@@ -310,7 +339,8 @@ public class StreamReadMonitoringFunction
       return commitTimeline.getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
-    } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
+    } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
+        && !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
       String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
       return commitTimeline.getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 1d58e4e..fc42394 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -85,6 +86,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -151,9 +153,8 @@ public class HoodieTableSource implements
         : requiredPos;
     this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
     this.filters = filters == null ? Collections.emptyList() : filters;
-    final String basePath = this.conf.getString(FlinkOptions.PATH);
     this.hadoopConf = StreamerUtil.getHadoopConf();
-    this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+    this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
     this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
   }
 
@@ -173,11 +174,8 @@ public class HoodieTableSource implements
             (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
           StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
+              conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          if (!(inputFormat instanceof MergeOnReadInputFormat)) {
-            throw new HoodieException("No successful commits under path " + path);
-          }
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
               .uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME))
@@ -218,7 +216,8 @@ public class HoodieTableSource implements
   @Override
   public Result applyFilters(List<ResolvedExpression> filters) {
     this.filters = new ArrayList<>(filters);
-    return Result.of(new ArrayList<>(filters), new ArrayList<>(filters));
+    // refuse all the filters now
+    return Result.of(Collections.emptyList(), new ArrayList<>(filters));
   }
 
   @Override
@@ -266,7 +265,18 @@ public class HoodieTableSource implements
     return requiredPartitions;
   }
 
+  private Set<String> getRequiredPartitionPaths() {
+    if (this.requiredPartitions == null) {
+      return Collections.emptySet();
+    }
+    return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions,
+        conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
+  }
+
   private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
+    if (paths.length == 0) {
+      return Collections.emptyList();
+    }
     FileStatus[] fileStatuses = Arrays.stream(paths)
         .flatMap(path ->
             Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf)))
@@ -304,6 +314,10 @@ public class HoodieTableSource implements
 
   @VisibleForTesting
   public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
+    return isStreaming ? getStreamInputFormat() : getBatchInputFormat();
+  }
+
+  private InputFormat<RowData, ?> getBatchInputFormat() {
     // When this table has no partition, just return an empty source.
     if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
       return new CollectionInputFormat<>(Collections.emptyList(), null);
@@ -314,13 +328,7 @@ public class HoodieTableSource implements
       return new CollectionInputFormat<>(Collections.emptyList(), null);
     }
 
-    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
-    final Schema tableAvroSchema;
-    try {
-      tableAvroSchema = schemaUtil.getTableAvroSchema();
-    } catch (Exception e) {
-      throw new HoodieException("Get table avro schema error", e);
-    }
+    final Schema tableAvroSchema = getTableAvroSchema();
     final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
@@ -331,17 +339,11 @@ public class HoodieTableSource implements
         final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
         switch (tableType) {
           case MERGE_ON_READ:
-            final List<MergeOnReadInputSplit> inputSplits;
-            if (!isStreaming) {
-              inputSplits = buildFileIndex(paths);
-              if (inputSplits.size() == 0) {
-                // When there is no input splits, just return an empty source.
-                LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
-                return new CollectionInputFormat<>(Collections.emptyList(), null);
-              }
-            } else {
-              // streaming reader would build the splits automatically.
-              inputSplits = Collections.emptyList();
+            final List<MergeOnReadInputSplit> inputSplits = buildFileIndex(paths);
+            if (inputSplits.size() == 0) {
+              // When there is no input splits, just return an empty source.
+              LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
+              return new CollectionInputFormat<>(Collections.emptyList(), null);
             }
             final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
                 rowType,
@@ -359,28 +361,9 @@ public class HoodieTableSource implements
                 .fieldTypes(rowDataType.getChildren())
                 .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
                 .limit(this.limit)
-                .emitDelete(isStreaming)
+                .emitDelete(false)
                 .build();
           case COPY_ON_WRITE:
-            if (isStreaming) {
-              final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
-                  rowType,
-                  requiredRowType,
-                  tableAvroSchema.toString(),
-                  AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
-                  Collections.emptyList(),
-                  conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
-              return MergeOnReadInputFormat.builder()
-                  .config(this.conf)
-                  .paths(FilePathUtils.toFlinkPaths(paths))
-                  .tableState(hoodieTableState2)
-                  // use the explicit fields data type because the AvroSchemaConverter
-                  // is not very stable.
-                  .fieldTypes(rowDataType.getChildren())
-                  .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
-                  .limit(this.limit)
-                  .build();
-            }
             FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
                 FilePathUtils.toFlinkPaths(paths),
                 this.schema.getColumnNames().toArray(new String[0]),
@@ -416,6 +399,86 @@ public class HoodieTableSource implements
     }
   }
 
+  private InputFormat<RowData, ?> getStreamInputFormat() {
+    // if table does not exist, use schema from the DDL
+    Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema();
+    final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+    final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
+
+    final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
+    org.apache.flink.core.fs.Path[] paths = new org.apache.flink.core.fs.Path[0];
+    if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
+      final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
+      switch (tableType) {
+        case MERGE_ON_READ:
+          final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+              rowType,
+              requiredRowType,
+              tableAvroSchema.toString(),
+              AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+              Collections.emptyList(),
+              conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+          return MergeOnReadInputFormat.builder()
+              .config(this.conf)
+              .paths(paths)
+              .tableState(hoodieTableState)
+              // use the explicit fields data type because the AvroSchemaConverter
+              // is not very stable.
+              .fieldTypes(rowDataType.getChildren())
+              .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+              .limit(this.limit)
+              .emitDelete(true)
+              .build();
+        case COPY_ON_WRITE:
+          final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
+              rowType,
+              requiredRowType,
+              tableAvroSchema.toString(),
+              AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+              Collections.emptyList(),
+              conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+          return MergeOnReadInputFormat.builder()
+              .config(this.conf)
+              .paths(paths)
+              .tableState(hoodieTableState2)
+              // use the explicit fields data type because the AvroSchemaConverter
+              // is not very stable.
+              .fieldTypes(rowDataType.getChildren())
+              .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+              .limit(this.limit)
+              .build();
+        default:
+          throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
+      }
+    }
+    String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType,
+        FlinkOptions.QUERY_TYPE_SNAPSHOT);
+    throw new HoodieException(errMsg);
+  }
+
+  private Schema inferSchemaFromDdl() {
+    Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType());
+    return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+  }
+
+  @VisibleForTesting
+  public Schema getTableAvroSchema() {
+    try {
+      TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+      return schemaUtil.getTableAvroSchema();
+    } catch (Throwable e) {
+      // table exists but has no written data
+      LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
+      return inferSchemaFromDdl();
+    }
+  }
+
+  @VisibleForTesting
+  public HoodieTableMetaClient getMetaClient() {
+    return this.metaClient;
+  }
+
   @VisibleForTesting
   public Configuration getConf() {
     return this.conf;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
index 83607cd..8f1347f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
@@ -36,6 +36,7 @@ import java.util.BitSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -82,11 +83,13 @@ public class FilePathUtils {
    * @param partitionKVs  The partition key value mapping
    * @param hivePartition Whether the partition path is with Hive style,
    *                      e.g. {partition key} = {partition value}
+   * @param sepSuffix     Whether to append the file separator as suffix
    * @return an escaped, valid partition name
    */
   public static String generatePartitionPath(
       LinkedHashMap<String, String> partitionKVs,
-      boolean hivePartition) {
+      boolean hivePartition,
+      boolean sepSuffix) {
     if (partitionKVs.isEmpty()) {
       return "";
     }
@@ -103,7 +106,9 @@ public class FilePathUtils {
       suffixBuf.append(escapePathName(e.getValue()));
       i++;
     }
-    suffixBuf.append(File.separator);
+    if (sepSuffix) {
+      suffixBuf.append(File.separator);
+    }
     return suffixBuf.toString();
   }
 
@@ -371,12 +376,31 @@ public class FilePathUtils {
       boolean hivePartition) {
     return partitionPaths.stream()
         .map(m -> validateAndReorderPartitions(m, partitionKeys))
-        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition))
+        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, true))
         .map(n -> new Path(path, n))
         .toArray(Path[]::new);
   }
 
   /**
+   * Transforms the given partition key value mapping to relative partition paths.
+   *
+   * @param partitionKeys The partition key list
+   * @param partitionPaths The partition key value mapping
+   * @param hivePartition Whether the partition path is in Hive style
+   *
+   * @see #getReadPaths
+   */
+  public static Set<String> toRelativePartitionPaths(
+      List<String> partitionKeys,
+      List<Map<String, String>> partitionPaths,
+      boolean hivePartition) {
+    return partitionPaths.stream()
+        .map(m -> validateAndReorderPartitions(m, partitionKeys))
+        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, false))
+        .collect(Collectors.toSet());
+  }
+
+  /**
    * Transforms the array of Hadoop paths to Flink paths.
    */
   public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 6dc6add..9f625ba 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -229,9 +229,7 @@ public class StreamerUtil {
   public static void initTableIfNotExists(Configuration conf) throws IOException {
     final String basePath = conf.getString(FlinkOptions.PATH);
     final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
-    // Hadoop FileSystem
-    FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
-    if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+    if (!tableExists(basePath, hadoopConf)) {
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
           .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
@@ -252,6 +250,19 @@ public class StreamerUtil {
   }
 
   /**
+   * Returns whether the hoodie table exists under given path {@code basePath}.
+   */
+  public static boolean tableExists(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
+    // Hadoop FileSystem
+    FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
+    try {
+      return fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME));
+    } catch (IOException e) {
+      throw new HoodieException("Error while checking whether table exists under path:" + basePath, e);
+    }
+  }
+
+  /**
    * Generates the bucket ID using format {partition path}_{fileID}.
    */
   public static String generateBucketKey(String partitionPath, String fileId) {
@@ -283,10 +294,36 @@ public class StreamerUtil {
   }
 
   /**
+   * Creates the meta client for reader.
+   *
+   * <p>The streaming pipeline process is long running, so empty table path is allowed,
+   * the reader would then check and refresh the meta client.
+   *
+   * @see org.apache.hudi.source.StreamReadMonitoringFunction
+   */
+  public static HoodieTableMetaClient metaClientForReader(
+      Configuration conf,
+      org.apache.hadoop.conf.Configuration hadoopConf) {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !tableExists(basePath, hadoopConf)) {
+      return null;
+    } else {
+      return createMetaClient(basePath, hadoopConf);
+    }
+  }
+
+  /**
+   * Creates the meta client.
+   */
+  public static HoodieTableMetaClient createMetaClient(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
+    return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
+  }
+
+  /**
    * Creates the meta client.
    */
   public static HoodieTableMetaClient createMetaClient(String basePath) {
-    return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
+    return createMetaClient(basePath, FlinkClientUtil.getHadoopConf());
   }
 
   /**
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index f145744..d13f683 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -169,6 +169,37 @@ public class TestStreamReadMonitoringFunction {
   }
 
   @Test
+  public void testConsumeFromEarliestCommit() throws Exception {
+    // write 2 commits first, then specify the start commit as 'earliest',
+    // all the splits should come from the earliest commit.
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+    String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
   public void testCheckpointRestore() throws Exception {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 9effdcc..26e0be6 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -124,6 +123,13 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     execInsertSql(streamTableEnv, insertInto);
     List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
+
+    streamTableEnv.getConfig().getConfiguration()
+        .setBoolean("table.dynamic-table-options.enabled", true);
+    // specify the start commit as earliest
+    List<Row> rows3 = execSelectSql(streamTableEnv,
+        "select * from t1/*+options('read.streaming.start-commit'='earliest')*/", 10);
+    assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
   }
 
   @ParameterizedTest
@@ -301,6 +307,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   }
 
   @ParameterizedTest
+  @MethodSource("tableTypeAndPartitioningParams")
+  void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .option(FlinkOptions.READ_AS_STREAMING, "true")
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = execSelectSql(streamTableEnv,
+        "select * from t1 where `partition`='par1'", 10);
+    final String expected = "["
+        + "+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), "
+        + "+I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]";
+    assertRowsEquals(result, expected, true);
+  }
+
+  @ParameterizedTest
   @MethodSource("executionModeAndPartitioningParams")
   void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
@@ -568,22 +602,48 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
   @Test
   void testStreamReadEmptyTablePath() throws Exception {
-    // create an empty table
+    // case1: table metadata path does not exists
+    // create a flink source table
+    String createHoodieTable = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.READ_AS_STREAMING, "true")
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .end();
+    streamTableEnv.executeSql(createHoodieTable);
+
+    // no exception expects to be thrown
+    List<Row> rows1 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows1, "[]");
+
+    // case2: empty table without data files
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     StreamerUtil.initTableIfNotExists(conf);
 
+    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows2, "[]");
+  }
+
+  @Test
+  void testBatchReadEmptyTablePath() throws Exception {
+    // case1: table metadata path does not exists
     // create a flink source table
     String createHoodieTable = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .option(FlinkOptions.READ_AS_STREAMING, "true")
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .end();
-    streamTableEnv.executeSql(createHoodieTable);
+    batchTableEnv.executeSql(createHoodieTable);
 
-    // execute query and assert throws exception
-    assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, "select * from t1", 10),
-        "No successful commits under path " + tempFile.getAbsolutePath());
+    // no exception expects to be thrown
+    assertThrows(Exception.class,
+        () -> execSelectSql(batchTableEnv, "select * from t1", 10),
+        "Exception should throw when querying non-exists table in batch mode");
 
+    // case2: empty table without data files
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamerUtil.initTableIfNotExists(conf);
+
+    List<Row> rows2 = CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from t1").collect());
+    assertRowsEquals(rows2, "[]");
   }
 
   @ParameterizedTest
@@ -781,6 +841,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
   private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
           throws InterruptedException {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
     tEnv.executeSql(sinkDDL);
     TableResult tableResult = tEnv.executeSql("insert into sink " + select);
     // wait for the timeout then cancels the job
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index fed3748..25742a7 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -25,12 +25,12 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
+import org.apache.avro.Schema;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
@@ -43,12 +43,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
@@ -62,17 +64,17 @@ public class TestHoodieTableSource {
   @TempDir
   File tempFile;
 
-  @BeforeEach
   void beforeEach() throws IOException {
     final String path = tempFile.getAbsolutePath();
-    conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf = TestConfigurations.getDefaultConf(path);
     StreamerUtil.initTableIfNotExists(conf);
     IntStream.range(1, 5)
         .forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
   }
 
   @Test
-  void testGetReadPaths() {
+  void testGetReadPaths() throws Exception {
+    beforeEach();
     HoodieTableSource tableSource = new HoodieTableSource(
         TestConfigurations.TABLE_SCHEMA,
         new Path(tempFile.getPath()),
@@ -99,6 +101,7 @@ public class TestHoodieTableSource {
 
   @Test
   void testGetInputFormat() throws Exception {
+    beforeEach();
     // write some data to let the TableSchemaResolver get the right instant
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
@@ -118,4 +121,29 @@ public class TestHoodieTableSource {
         () -> tableSource.getInputFormat(),
         "Invalid query type : 'incremental'. Only 'snapshot' is supported now");
   }
+
+  @Test
+  void testGetTableAvroSchema() {
+    final String path = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(path);
+    conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
+
+    HoodieTableSource tableSource = new HoodieTableSource(
+        TestConfigurations.TABLE_SCHEMA,
+        new Path(tempFile.getPath()),
+        Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
+        "default-par",
+        conf);
+    assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed");
+    final String schemaFields = tableSource.getTableAvroSchema().getFields().stream()
+        .map(Schema.Field::name)
+        .collect(Collectors.joining(","));
+    final String expected = "_hoodie_commit_time,"
+        + "_hoodie_commit_seqno,"
+        + "_hoodie_record_key,"
+        + "_hoodie_partition_path,"
+        + "_hoodie_file_name,"
+        + "uuid,name,age,ts,partition";
+    assertThat(schemaFields, is(expected));
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index ecc86e7..4e9ad51 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
+import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -57,8 +58,6 @@ public class TestUtils {
 
   public static StreamReadMonitoringFunction getMonitorFunc(Configuration conf) {
     final String basePath = conf.getString(FlinkOptions.PATH);
-    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
-    return new StreamReadMonitoringFunction(conf, new Path(basePath), metaClient, 1024 * 1024L);
+    return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 1024L, Collections.emptySet());
   }
 }