You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/11/23 18:40:53 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2264: [HUDI-1406] Add date partition based source input selector for DeltaStreamer

vinothchandar commented on a change in pull request #2264:
URL: https://github.com/apache/hudi/pull/2264#discussion_r528904411



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;

Review comment:
       Can we just use log4j?  I think that's what we use elsewhere directly

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";

Review comment:
       .source.dfs.datepartitioned.selector.depth

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";

Review comment:
       .source.dfs.datepartitioned.selector.lookback.days

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";

Review comment:
       .source.dfs.datepartitioned.selector.currentdate

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition

Review comment:
       can we place the defaults adjacent to the property. so its easier to read.
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
+    public static final int DEFAULT_NUM_DAYS_TO_LIST = 2;
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
+     * this path selector would be a no-op and lists all paths under the table basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(NUM_PREV_DAYS_TO_LIST, DEFAULT_NUM_DAYS_TO_LIST);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism =
+        props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
+      JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
+    try {
+      // obtain all eligible files under root folder.
+      log.info(
+          "Root path => "
+              + props.getString(ROOT_INPUT_PATH_PROP)
+              + " source limit => "
+              + sourceLimit
+              + " depth of day partition => "
+              + datePartitionDepth
+              + " num prev days to list => "
+              + numPrevDaysToList
+              + " from current date => "
+              + currentDate);
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
+      List<String> prunedPaths =
+          pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      for (String path : prunedPaths) {
+        eligibleFiles.addAll(listEligibleFiles(fs, new Path(path), lastCheckpointTime));
+      }
+      // sort them by modification time.
+      eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+      // Filter based on checkpoint & input size, if needed
+      long currentBytes = 0;
+      long maxModificationTime = Long.MIN_VALUE;
+      List<FileStatus> filteredFiles = new ArrayList<>();
+      for (FileStatus f : eligibleFiles) {
+        if (currentBytes + f.getLen() >= sourceLimit) {
+          // we have enough data, we are done
+          break;
+        }
+
+        maxModificationTime = f.getModificationTime();
+        currentBytes += f.getLen();
+        filteredFiles.add(f);
+      }
+
+      // no data to read
+      if (filteredFiles.isEmpty()) {
+        return new ImmutablePair<>(
+            Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
+      }
+
+      // read the files out.
+      String pathStr =
+          filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
+
+      return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
+    } catch (IOException ioe) {
+      throw new HoodieIOException(
+          "Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
+    }
+  }
+
+  /**
+   * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
+   * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
+   */
+  public List<String> pruneDatePartitionPaths(
+      HoodieSparkEngineContext context, FileSystem fs, String rootPath) {

Review comment:
       move to previous line?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";

Review comment:
       are you assuming a certain format for the current date to be specified in? would be good to doc/comment that. better have it in the property name

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
+    public static final int DEFAULT_NUM_DAYS_TO_LIST = 2;
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
+     * this path selector would be a no-op and lists all paths under the table basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(NUM_PREV_DAYS_TO_LIST, DEFAULT_NUM_DAYS_TO_LIST);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism =
+        props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
+      JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
+    try {
+      // obtain all eligible files under root folder.
+      log.info(
+          "Root path => "
+              + props.getString(ROOT_INPUT_PATH_PROP)
+              + " source limit => "
+              + sourceLimit
+              + " depth of day partition => "
+              + datePartitionDepth
+              + " num prev days to list => "
+              + numPrevDaysToList
+              + " from current date => "
+              + currentDate);
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
+      List<String> prunedPaths =
+          pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      for (String path : prunedPaths) {
+        eligibleFiles.addAll(listEligibleFiles(fs, new Path(path), lastCheckpointTime));
+      }
+      // sort them by modification time.
+      eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));

Review comment:
       Once again, this sorting can happen in parallel right? and just collect it finally

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
+    public static final int DEFAULT_NUM_DAYS_TO_LIST = 2;
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
+     * this path selector would be a no-op and lists all paths under the table basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(NUM_PREV_DAYS_TO_LIST, DEFAULT_NUM_DAYS_TO_LIST);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism =
+        props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
+      JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
+    try {
+      // obtain all eligible files under root folder.
+      log.info(
+          "Root path => "
+              + props.getString(ROOT_INPUT_PATH_PROP)
+              + " source limit => "
+              + sourceLimit
+              + " depth of day partition => "
+              + datePartitionDepth
+              + " num prev days to list => "
+              + numPrevDaysToList
+              + " from current date => "
+              + currentDate);
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
+      List<String> prunedPaths =
+          pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      for (String path : prunedPaths) {
+        eligibleFiles.addAll(listEligibleFiles(fs, new Path(path), lastCheckpointTime));
+      }
+      // sort them by modification time.
+      eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+      // Filter based on checkpoint & input size, if needed
+      long currentBytes = 0;
+      long maxModificationTime = Long.MIN_VALUE;
+      List<FileStatus> filteredFiles = new ArrayList<>();
+      for (FileStatus f : eligibleFiles) {
+        if (currentBytes + f.getLen() >= sourceLimit) {
+          // we have enough data, we are done
+          break;
+        }
+
+        maxModificationTime = f.getModificationTime();
+        currentBytes += f.getLen();
+        filteredFiles.add(f);
+      }
+
+      // no data to read
+      if (filteredFiles.isEmpty()) {
+        return new ImmutablePair<>(
+            Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
+      }
+
+      // read the files out.
+      String pathStr =
+          filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
+
+      return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
+    } catch (IOException ioe) {
+      throw new HoodieIOException(
+          "Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
+    }
+  }
+
+  /**
+   * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
+   * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
+   */
+  public List<String> pruneDatePartitionPaths(
+      HoodieSparkEngineContext context, FileSystem fs, String rootPath) {
+    List<String> partitionPaths = new ArrayList<>();
+    // get all partition paths before date partition level
+    partitionPaths.add(rootPath);
+    if (datePartitionDepth <= 0) {
+      return partitionPaths;
+    }
+    SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
+    for (int i = 0; i < datePartitionDepth; i++) {
+      partitionPaths =
+          context.flatMap(
+              partitionPaths,
+              path -> {
+                Path subDir = new Path(path);
+                FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+                // skip files/dirs whose names start with (_, ., etc)
+                FileStatus[] statuses =
+                    fileSystem.listStatus(
+                        subDir,
+                        file ->
+                            IGNORE_FILEPREFIX_LIST.stream()
+                                .noneMatch(pfx -> file.getName().startsWith(pfx)));
+                List<String> res = new ArrayList<>();
+                for (FileStatus status : statuses) {
+                  res.add(status.getPath().toString());
+                }
+                return res.stream();
+              },
+              partitionsListParallelism);
+    }
+
+    // Prune date partitions to last few days

Review comment:
       why collect this and then prune?  can't we prune also in parallel? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
+    public static final int DEFAULT_NUM_DAYS_TO_LIST = 2;
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
+     * this path selector would be a no-op and lists all paths under the table basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(NUM_PREV_DAYS_TO_LIST, DEFAULT_NUM_DAYS_TO_LIST);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism =
+        props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
+      JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
+    try {
+      // obtain all eligible files under root folder.
+      log.info(
+          "Root path => "
+              + props.getString(ROOT_INPUT_PATH_PROP)
+              + " source limit => "
+              + sourceLimit
+              + " depth of day partition => "
+              + datePartitionDepth
+              + " num prev days to list => "
+              + numPrevDaysToList
+              + " from current date => "
+              + currentDate);
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
+      List<String> prunedPaths =
+          pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      for (String path : prunedPaths) {
+        eligibleFiles.addAll(listEligibleFiles(fs, new Path(path), lastCheckpointTime));
+      }
+      // sort them by modification time.
+      eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+      // Filter based on checkpoint & input size, if needed
+      long currentBytes = 0;
+      long maxModificationTime = Long.MIN_VALUE;
+      List<FileStatus> filteredFiles = new ArrayList<>();
+      for (FileStatus f : eligibleFiles) {
+        if (currentBytes + f.getLen() >= sourceLimit) {
+          // we have enough data, we are done
+          break;
+        }
+
+        maxModificationTime = f.getModificationTime();
+        currentBytes += f.getLen();
+        filteredFiles.add(f);
+      }
+
+      // no data to read
+      if (filteredFiles.isEmpty()) {
+        return new ImmutablePair<>(
+            Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
+      }
+
+      // read the files out.
+      String pathStr =
+          filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
+
+      return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
+    } catch (IOException ioe) {
+      throw new HoodieIOException(
+          "Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
+    }
+  }
+
+  /**
+   * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
+   * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
+   */
+  public List<String> pruneDatePartitionPaths(
+      HoodieSparkEngineContext context, FileSystem fs, String rootPath) {
+    List<String> partitionPaths = new ArrayList<>();
+    // get all partition paths before date partition level
+    partitionPaths.add(rootPath);
+    if (datePartitionDepth <= 0) {
+      return partitionPaths;
+    }
+    SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
+    for (int i = 0; i < datePartitionDepth; i++) {
+      partitionPaths =
+          context.flatMap(
+              partitionPaths,
+              path -> {
+                Path subDir = new Path(path);
+                FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+                // skip files/dirs whose names start with (_, ., etc)
+                FileStatus[] statuses =

Review comment:
       can we keep these in a single line?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";

Review comment:
       .source.dfs.datepartitioned.selector.listparallelism

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_NUM_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.NUM_PREV_DAYS_TO_LIST;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
+ * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(DatePartitionPathSelector.class);
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.input.date_partition.selector.date_partition_depth";
+    public static final String NUM_PREV_DAYS_TO_LIST =
+        "hoodie.deltastreamer.source.input.date_partition.selector.num_prev_days_to_list";
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.input.date_partition.selector.current_date";
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.input.date_partition.selector.partitions.list.parallelism";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
+    public static final int DEFAULT_NUM_DAYS_TO_LIST = 2;
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
+     * this path selector would be a no-op and lists all paths under the table basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(NUM_PREV_DAYS_TO_LIST, DEFAULT_NUM_DAYS_TO_LIST);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism =
+        props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
+      JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
+    try {
+      // obtain all eligible files under root folder.
+      log.info(
+          "Root path => "
+              + props.getString(ROOT_INPUT_PATH_PROP)
+              + " source limit => "
+              + sourceLimit
+              + " depth of day partition => "
+              + datePartitionDepth
+              + " num prev days to list => "
+              + numPrevDaysToList
+              + " from current date => "
+              + currentDate);
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+      HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
+      List<String> prunedPaths =
+          pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      for (String path : prunedPaths) {
+        eligibleFiles.addAll(listEligibleFiles(fs, new Path(path), lastCheckpointTime));
+      }
+      // sort them by modification time.
+      eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+      // Filter based on checkpoint & input size, if needed
+      long currentBytes = 0;
+      long maxModificationTime = Long.MIN_VALUE;
+      List<FileStatus> filteredFiles = new ArrayList<>();

Review comment:
       can we do this also in spark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org