You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/20 09:52:52 UTC
[hudi] branch master updated: [MINOR] Make sure factory method is
used to instanciate DFSPathSelector (#2187)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fd269dd [MINOR] Make sure factory method is used to instanciate DFSPathSelector (#2187)
fd269dd is described below
commit fd269ddeb07fad015f71fc2c7daa53ace0e0e2da
Author: Ho Tien Vu <ho...@gmail.com>
AuthorDate: Tue Oct 20 17:52:31 2020 +0800
[MINOR] Make sure factory method is used to instanciate DFSPathSelector (#2187)
* Move createSourceSelector into DFSPathSelector factory method
* Replace constructor call with factory method
* Added some javadoc
---
.../org/apache/hudi/utilities/UtilHelpers.java | 18 --------------
.../hudi/utilities/sources/AvroDFSSource.java | 3 +--
.../hudi/utilities/sources/CsvDFSSource.java | 2 +-
.../hudi/utilities/sources/JsonDFSSource.java | 2 +-
.../hudi/utilities/sources/ParquetDFSSource.java | 2 +-
.../utilities/sources/helpers/DFSPathSelector.java | 29 ++++++++++++++++++++++
6 files changed, 33 insertions(+), 23 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 29fc195..e767909 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -18,7 +18,6 @@
package org.apache.hudi.utilities;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -46,7 +45,6 @@ import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
-import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -377,22 +375,6 @@ public class UtilHelpers {
}
}
- public static DFSPathSelector createSourceSelector(TypedProperties props,
- Configuration conf) throws IOException {
- String sourceSelectorClass =
- props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
- try {
- DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
- new Class<?>[]{TypedProperties.class, Configuration.class},
- props, conf);
-
- LOG.info("Using path selector " + selector.getClass().getName());
- return selector;
- } catch (Throwable e) {
- throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
- }
- }
-
public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
SchemaProvider originalProvider = schemaProvider;
if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index b8f29e8..1152cd6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
@@ -46,7 +45,7 @@ public class AvroDFSSource extends AvroSource {
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) throws IOException {
super(props, sparkContext, sparkSession, schemaProvider);
- this.pathSelector = UtilHelpers
+ this.pathSelector = DFSPathSelector
.createSourceSelector(props, sparkContext.hadoopConfiguration());
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
index 3d158ba..dc40b47 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -79,7 +79,7 @@ public class CsvDFSSource extends RowSource {
SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
- this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
+ this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
if (schemaProvider != null) {
sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
.dataType();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
index e66bfdf..d34289d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
@@ -38,7 +38,7 @@ public class JsonDFSSource extends JsonSource {
public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
- this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
+ this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index dea410a..55d2de2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -39,7 +39,7 @@ public class ParquetDFSSource extends RowSource {
public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
- this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
+ this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index c8690f5..6b58003 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -22,8 +22,10 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
@@ -66,6 +68,33 @@ public class DFSPathSelector {
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
+ /**
+ * Factory method for creating custom DFSPathSelector. Default selector
+ * to use is {@link DFSPathSelector}
+ */
+ public static DFSPathSelector createSourceSelector(TypedProperties props,
+ Configuration conf) {
+ String sourceSelectorClass = props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
+ DFSPathSelector.class.getName());
+ try {
+ DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
+ new Class<?>[]{TypedProperties.class, Configuration.class},
+ props, conf);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * Get the list of files changed since last checkpoint.
+ *
+ * @param lastCheckpointStr the last checkpoint time string, empty if first run
+ * @param sourceLimit max bytes to read each time
+ * @return the list of files concatenated and their latest modified time
+ */
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
long sourceLimit) {