You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/08/24 20:26:56 UTC

[hudi] branch master updated: [HUDI-1137] Add option to configure different path selector

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ea983ff  [HUDI-1137] Add option to configure different path selector
ea983ff is described below

commit ea983ff912dab8604eec3085bc1c041cb6e60bc8
Author: Satish Kotha <sa...@uber.com>
AuthorDate: Mon Aug 24 11:11:10 2020 -0700

    [HUDI-1137] Add option to configure different path selector
---
 .../integ/testsuite/helpers/DFSTestSuitePathSelector.java  | 14 ++++++++++++--
 .../main/java/org/apache/hudi/utilities/UtilHelpers.java   |  9 +++++++--
 .../org/apache/hudi/utilities/sources/AvroDFSSource.java   |  3 +--
 .../hudi/utilities/sources/helpers/DFSPathSelector.java    |  1 +
 4 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index b67e21f..bfc8368 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -32,12 +32,17 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline.
  */
 public class DFSTestSuitePathSelector extends DFSPathSelector {
+  private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
 
   public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) {
     super(props, hadoopConf);
@@ -54,9 +59,12 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
         lastBatchId = Integer.parseInt(lastCheckpointStr.get());
         nextBatchId = lastBatchId + 1;
       } else {
-        lastBatchId = -1;
-        nextBatchId = 0;
+        lastBatchId = 0;
+        nextBatchId = 1;
       }
+
+      log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
+          + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
       // obtain all eligible files for the batch
       List<FileStatus> eligibleFiles = new ArrayList<>();
       FileStatus[] fileStatuses = fs.globStatus(
@@ -73,6 +81,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
           }
         }
       }
+
+      log.info("Reading " + eligibleFiles.size() + " files. ");
       // no data to readAvro
       if (eligibleFiles.size() == 0) {
         return new ImmutablePair<>(Option.empty(),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 0531196..14e16ab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -352,12 +352,17 @@ public class UtilHelpers {
     }
   }
 
-  public static DFSPathSelector createSourceSelector(String sourceSelectorClass, TypedProperties props,
+  public static DFSPathSelector createSourceSelector(TypedProperties props,
       Configuration conf) throws IOException {
+    String sourceSelectorClass =
+        props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
     try {
-      return (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
+      DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
           new Class<?>[]{TypedProperties.class, Configuration.class},
           props, conf);
+
+      LOG.info("Using path selector " + selector.getClass().getName());
+      return selector;
     } catch (Throwable e) {
       throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
     }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index b5ce96f..b8f29e8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -47,8 +47,7 @@ public class AvroDFSSource extends AvroSource {
       SchemaProvider schemaProvider) throws IOException {
     super(props, sparkContext, sparkSession, schemaProvider);
     this.pathSelector = UtilHelpers
-        .createSourceSelector(DFSPathSelector.class.getName(), props, sparkContext
-        .hadoopConfiguration());
+        .createSourceSelector(props, sparkContext.hadoopConfiguration());
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 59263e4..5d56f2a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -52,6 +52,7 @@ public class DFSPathSelector {
   public static class Config {
 
     public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
+    public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
   }
 
   protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");