You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/08 01:48:44 UTC
[hudi] branch master updated: [HUDI-1743] Added support for
SqlFileBasedTransformer (#2747)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 57611d1 [HUDI-1743] Added support for SqlFileBasedTransformer (#2747)
57611d1 is described below
commit 57611d10b5ae5cb3580dcbd4abb4cdc131cbd575
Author: Vinoth Govindarajan <vi...@uber.com>
AuthorDate: Mon Jun 7 18:48:27 2021 -0700
[HUDI-1743] Added support for SqlFileBasedTransformer (#2747)
---
.../transform/SqlFileBasedTransformer.java | 106 ++++++++++++
.../transform/TestSqlFileBasedTransformer.java | 178 +++++++++++++++++++++
.../sql-file-transformer-empty.sql | 25 +++
.../sql-file-transformer-invalid.sql | 22 +++
.../delta-streamer-config/sql-file-transformer.sql | 27 ++++
5 files changed, 358 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
new file mode 100644
index 0000000..04264bf
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.Scanner;
+import java.util.UUID;
+
+/**
+ * A transformer that allows a sql template file be used to transform the source before writing to
+ * Hudi data-set.
+ *
+ * <p>The query should reference the source as a table named "\<SRC\>"
+ *
+ * <p>The final sql statement result is used as the write payload.
+ *
+ * <p>The SQL file is configured with this hoodie property:
+ * hoodie.deltastreamer.transformer.sql.file
+ *
+ * <p>Example Spark SQL Query:
+ *
+ * <p>CACHE TABLE tmp_personal_trips AS
+ * SELECT * FROM <SRC> WHERE trip_type='personal_trips';
+ * <p>
+ * SELECT * FROM tmp_personal_trips;
+ */
+public class SqlFileBasedTransformer implements Transformer {
+
+ private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class);
+
+ private static final String SRC_PATTERN = "<SRC>";
+ private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
+
+ @Override
+ public Dataset<Row> apply(
+ final JavaSparkContext jsc,
+ final SparkSession sparkSession,
+ final Dataset<Row> rowDataset,
+ final TypedProperties props) {
+
+ final String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE);
+ if (null == sqlFile) {
+ throw new IllegalArgumentException(
+ "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")");
+ }
+
+ final FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true);
+ // tmp table name doesn't like dashes
+ final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+ LOG.info("Registering tmp table : " + tmpTable);
+ rowDataset.registerTempTable(tmpTable);
+
+ try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
+ Dataset<Row> rows = null;
+ // each sql statement is separated with semicolon hence set that as delimiter.
+ scanner.useDelimiter(";");
+ LOG.info("SQL Query for transformation : ");
+ while (scanner.hasNext()) {
+ String sqlStr = scanner.next();
+ sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim();
+ if (!sqlStr.isEmpty()) {
+ LOG.info(sqlStr);
+ // overwrite the same dataset object until the last statement then return.
+ rows = sparkSession.sql(sqlStr);
+ }
+ }
+ return rows;
+ } catch (final IOException ioe) {
+ throw new HoodieIOException("Error reading transformer SQL file.", ioe);
+ }
+ }
+
+ /** Configs supported. */
+ private static class Config {
+
+ private static final String TRANSFORMER_SQL_FILE = "hoodie.deltastreamer.transformer.sql.file";
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
new file mode 100644
index 0000000..833d7b4
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestSqlFileBasedTransformer extends UtilitiesTestBase {
+ private TypedProperties props;
+ private SqlFileBasedTransformer sqlFileTransformer;
+ private Dataset<Row> inputDatasetRows;
+ private Dataset<Row> emptyDatasetRow;
+
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass();
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "delta-streamer-config/sql-file-transformer.sql",
+ UtilitiesTestBase.dfs,
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "delta-streamer-config/sql-file-transformer-invalid.sql",
+ UtilitiesTestBase.dfs,
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "delta-streamer-config/sql-file-transformer-empty.sql",
+ UtilitiesTestBase.dfs,
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ }
+
+ @Override
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ props = new TypedProperties();
+ sqlFileTransformer = new SqlFileBasedTransformer();
+ inputDatasetRows = getInputDatasetRows();
+ emptyDatasetRow = getEmptyDatasetRow();
+ }
+
+ @Override
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ @Test
+ public void testSqlFileBasedTransformerIllegalArguments() {
+ // Test if the class throws illegal argument exception when argument not present.
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+ }
+
+ @Test
+ public void testSqlFileBasedTransformerIncorrectConfig() {
+ // Test if the class throws hoodie IO exception correctly when given a incorrect config.
+ props.setProperty(
+ "hoodie.deltastreamer.transformer.sql.file",
+ UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql");
+ assertThrows(
+ HoodieIOException.class,
+ () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+ }
+
+ @Test
+ public void testSqlFileBasedTransformerInvalidSQL() {
+ // Test if the SQL file based transformer works as expected for the invalid SQL statements.
+ props.setProperty(
+ "hoodie.deltastreamer.transformer.sql.file",
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+ assertThrows(
+ ParseException.class,
+ () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+ }
+
+ @Test
+ public void testSqlFileBasedTransformerEmptyDataset() {
+ // Test if the SQL file based transformer works as expected for the empty SQL statements.
+ props.setProperty(
+ "hoodie.deltastreamer.transformer.sql.file",
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+ Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
+ String[] actualRows = emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
+ String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new String[0]);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void testSqlFileBasedTransformer() {
+ // Test if the SQL file based transformer works as expected for the correct input.
+ props.setProperty(
+ "hoodie.deltastreamer.transformer.sql.file",
+ UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+ Dataset<Row> transformedRow =
+ sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
+
+ // Called distinct() and sort() to match the transformation in this file:
+ // hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
+ String[] expectedRows =
+ inputDatasetRows
+ .distinct()
+ .sort("col1")
+ .as(Encoders.STRING())
+ .collectAsList()
+ .toArray(new String[0]);
+ String[] actualRows = transformedRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ private Dataset<Row> getInputDatasetRows() {
+ // Create few rows with duplicate data.
+ List<Row> list = new ArrayList<>();
+ list.add(RowFactory.create("one"));
+ list.add(RowFactory.create("two"));
+ list.add(RowFactory.create("three"));
+ list.add(RowFactory.create("four"));
+ list.add(RowFactory.create("four"));
+ // Create the schema struct.
+ List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
+ listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
+ StructType structType = DataTypes.createStructType(listOfStructField);
+ // Create the data frame with the rows and schema.
+ return sparkSession.createDataFrame(list, structType);
+ }
+
+ private Dataset<Row> getEmptyDatasetRow() {
+ // Create the schema struct.
+ List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
+ listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
+ StructType structType = DataTypes.createStructType(listOfStructField);
+ // Create the data frame with the rows and schema.
+ List<Row> list = new ArrayList<>();
+ // Create empty dataframe with the schema.
+ return sparkSession.createDataFrame(list, structType);
+ }
+}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql
new file mode 100644
index 0000000..ae83b47
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql
@@ -0,0 +1,25 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"), you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CACHE
+TABLE tmp_trips
+SELECT *
+FROM <SRC>
+WHERE
+ True = False;
+
+SELECT *
+FROM tmp_trips;
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql
new file mode 100644
index 0000000..42b2e7e
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"), you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SEECT
+DISTINCT col1
+FROM
+ <SRC>
+ORDER BY col1;
+
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
new file mode 100644
index 0000000..ce14b79
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
@@ -0,0 +1,27 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"), you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SELECT RAND();
+
+SELECT 1;
+
+SELECT
+ DISTINCT col1
+FROM
+ <SRC>
+ORDER BY col1;
+