You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/01 15:21:37 UTC
[incubator-hudi] branch master updated: [HUDI-731] Add
ChainedTransformer (#1440)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5b53b0d [HUDI-731] Add ChainedTransformer (#1440)
5b53b0d is described below
commit 5b53b0d85e0d60a37c37941b5a653b0718534e7b
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Wed Apr 1 08:21:31 2020 -0700
[HUDI-731] Add ChainedTransformer (#1440)
* [HUDI-731] Add ChainedTransformer
---
.../org/apache/hudi/utilities/UtilHelpers.java | 13 ++-
.../hudi/utilities/deltastreamer/DeltaSync.java | 8 +-
.../deltastreamer/HoodieDeltaStreamer.java | 21 ++++-
.../utilities/transform/ChainedTransformer.java | 54 +++++++++++
.../hudi/utilities/TestHoodieDeltaStreamer.java | 67 +++++++-------
.../org/apache/hudi/utilities/TestUtilHelpers.java | 101 +++++++++++++++++++++
.../transform/TestChainedTransformer.java | 92 +++++++++++++++++++
.../{ => transform}/TestFlatteningTransformer.java | 4 +-
8 files changed, 314 insertions(+), 46 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 8930084..222a391 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.avro.Schema;
@@ -67,7 +68,9 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -102,11 +105,15 @@ public class UtilHelpers {
}
}
- public static Transformer createTransformer(String transformerClass) throws IOException {
+ public static Option<Transformer> createTransformer(List<String> classNames) throws IOException {
try {
- return transformerClass == null ? null : (Transformer) ReflectionUtils.loadClass(transformerClass);
+ List<Transformer> transformers = new ArrayList<>();
+ for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) {
+ transformers.add(ReflectionUtils.loadClass(className));
+ }
+ return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers));
} catch (Throwable e) {
- throw new IOException("Could not load transformer class " + transformerClass, e);
+ throw new IOException("Could not load transformer class(es) " + classNames, e);
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 99cb497..5cc33ee 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -106,7 +106,7 @@ public class DeltaSync implements Serializable {
/**
* Allows transforming source to target table before writing.
*/
- private transient Transformer transformer;
+ private transient Option<Transformer> transformer;
/**
* Extract the key for the target table.
@@ -173,7 +173,7 @@ public class DeltaSync implements Serializable {
refreshTimeline();
- this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
+ this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new SourceFormatAdapter(
@@ -281,14 +281,14 @@ public class DeltaSync implements Serializable {
final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
final SchemaProvider schemaProvider;
- if (transformer != null) {
+ if (transformer.isPresent()) {
// Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them
// to generic records for writing
InputBatch<Dataset<Row>> dataAndCheckpoint =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
Option<Dataset<Row>> transformed =
- dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
+ dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema,
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 948033f..bc4c85d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -54,6 +54,7 @@ import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -64,6 +65,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
@@ -148,6 +150,17 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
+ private static class TransformersConverter implements IStringConverter<List<String>> {
+
+ @Override
+ public List<String> convert(String value) throws ParameterException {
+ return value == null ? null : Arrays.stream(value.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ }
+ }
+
public static class Config implements Serializable {
@Parameter(names = {"--target-base-path"},
@@ -196,11 +209,13 @@ public class HoodieDeltaStreamer implements Serializable {
public String schemaProviderClassName = null;
@Parameter(names = {"--transformer-class"},
- description = "subclass of org.apache.hudi.utilities.transform.Transformer"
+ description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer"
+ ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before "
+ "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
- + "allows a SQL query templated to be passed as a transformation function)")
- public String transformerClassName = null;
+ + "allows a SQL query templated to be passed as a transformation function). "
+ + "Pass a comma-separated list of subclass names to chain the transformations.",
+ converter = TransformersConverter.class)
+ public List<String> transformerClassNames = null;
@Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. "
+ "Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
new file mode 100644
index 0000000..1161a73
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially.
+ */
+public class ChainedTransformer implements Transformer {
+
+ private List<Transformer> transformers;
+
+ public ChainedTransformer(List<Transformer> transformers) {
+ this.transformers = transformers;
+ }
+
+ public List<String> getTransformersNames() {
+ return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList());
+ }
+
+ @Override
+ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
+ Dataset<Row> dataset = rowDataset;
+ for (Transformer t : transformers) {
+ dataset = t.apply(jsc, sparkSession, dataset, properties);
+ }
+ return dataset;
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 5ada456..b7323d4 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -78,6 +78,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
@@ -183,39 +184,39 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) {
- return makeConfig(basePath, op, DropAllTransformer.class.getName());
+ return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
- return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName());
+ return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
- static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) {
- return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false);
+ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames) {
+ return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false);
}
- static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
- String propsFilename, boolean enableHiveSync) {
- return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true,
- false, null, null);
+ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
+ String propsFilename, boolean enableHiveSync) {
+ return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true,
+ false, null, null);
}
- static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
+ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
- String payloadClassName, String tableType) {
- return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
+ String payloadClassName, String tableType) {
+ return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
- String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
+ List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
cfg.sourceClassName = sourceClassName;
- cfg.transformerClassName = transformerClassName;
+ cfg.transformerClassNames = transformerClassNames;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = sourceOrderingField;
@@ -339,7 +340,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
- TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), jsc);
+ Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
fail("Should error out when setting the key generator class property to an invalid value");
} catch (IOException e) {
@@ -451,7 +452,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
@@ -524,7 +525,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public void testNullSchemaProvider() throws Exception {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
false, false, null, null);
try {
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
@@ -539,15 +540,15 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
- true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
+ true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
//now assert that hoodie.properties file now has updated payload class name
@@ -565,14 +566,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public void testPayloadClassUpdateWithCOWTable() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, true, DummyAvroPayload.class.getName(), null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
@@ -668,12 +669,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
}
- private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
- prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
+ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+ prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
- transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
+ transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
@@ -687,7 +688,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
- testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
+ testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
@@ -697,7 +698,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
- testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
+ testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
private void prepareCsvDFSSource(
@@ -740,14 +741,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
private void testCsvDFSSource(
- boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception {
- prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null);
+ boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+ prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
- transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+ transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
@@ -785,7 +786,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
- testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName());
+ testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
@@ -793,7 +794,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// The CSV files have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
- testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName());
+ testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@Test
@@ -824,7 +825,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// No CSV header and no schema provider at the same time are not recommended,
// as the transformer behavior may be unexpected
try {
- testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName());
+ testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
fail("Should error out when doing the transformation.");
} catch (AnalysisException e) {
LOG.error("Expected error during transformation", e);
@@ -837,7 +838,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// The CSV files do not have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
- testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName());
+ testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
/**
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
new file mode 100644
index 0000000..f49e750
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.transform.ChainedTransformer;
+import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Enclosed.class)
+public class TestUtilHelpers {
+
+ public static class TestCreateTransformer {
+
+ public static class TransformerFoo implements Transformer {
+
+ @Override
+ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
+ return null;
+ }
+ }
+
+ public static class TransformerBar implements Transformer {
+
+ @Override
+ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
+ return null;
+ }
+ }
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Test
+ public void testCreateTransformerNotPresent() throws IOException {
+ assertFalse(UtilHelpers.createTransformer(null).isPresent());
+ }
+
+ @Test
+ public void testCreateTransformerLoadOneClass() throws IOException {
+ Transformer transformer = UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get();
+ assertTrue(transformer instanceof ChainedTransformer);
+ List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
+ assertEquals(1, transformerNames.size());
+ assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
+ }
+
+ @Test
+ public void testCreateTransformerLoadMultipleClasses() throws IOException {
+ List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), TransformerBar.class.getName());
+ Transformer transformer = UtilHelpers.createTransformer(classNames).get();
+ assertTrue(transformer instanceof ChainedTransformer);
+ List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
+ assertEquals(2, transformerNames.size());
+ assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
+ assertEquals(TransformerBar.class.getName(), transformerNames.get(1));
+ }
+
+ @Test
+ public void testCreateTransformerThrowsException() throws IOException {
+ exceptionRule.expect(IOException.class);
+ exceptionRule.expectMessage("Could not load transformer class(es) [foo, bar]");
+ UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
+ }
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java
new file mode 100644
index 0000000..dd5b8b9
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.utilities.UtilHelpers;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestChainedTransformer {
+
+ private JavaSparkContext jsc;
+ private SparkSession sparkSession;
+
+ @Before
+ public void setUp() {
+ jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
+ sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+ }
+
+ @After
+ public void tearDown() {
+ jsc.stop();
+ }
+
+ @Test
+ public void testChainedTransformation() {
+ StructType schema = DataTypes.createStructType(
+ new StructField[] {
+ createStructField("foo", StringType, false)
+ });
+ Row r1 = RowFactory.create("100");
+ Row r2 = RowFactory.create("200");
+ Dataset<Row> original = sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
+
+ Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
+ Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
+ ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
+ Dataset<Row> transformed = transformer.apply(jsc, sparkSession, original, null);
+
+ assertEquals(2, transformed.count());
+ assertArrayEquals(new String[] {"bar"}, transformed.columns());
+ List<Row> rows = transformed.collectAsList();
+ assertEquals(100, rows.get(0).getInt(0));
+ assertEquals(200, rows.get(1).getInt(0));
+ }
+
+ @Test
+ public void testGetTransformersNames() {
+ Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
+ Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
+ ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
+ List<String> classNames = transformer.getTransformersNames();
+ assertEquals(t1.getClass().getName(), classNames.get(0));
+ assertEquals(t2.getClass().getName(), classNames.get(1));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
similarity index 95%
rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
rename to hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
index d119102..bb95629 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.utilities.transform.FlatteningTransformer;
+package org.apache.hudi.utilities.transform;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;