You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/08 01:48:44 UTC

[hudi] branch master updated: [HUDI-1743] Added support for SqlFileBasedTransformer (#2747)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57611d1  [HUDI-1743] Added support for SqlFileBasedTransformer (#2747)
57611d1 is described below

commit 57611d10b5ae5cb3580dcbd4abb4cdc131cbd575
Author: Vinoth Govindarajan <vi...@uber.com>
AuthorDate: Mon Jun 7 18:48:27 2021 -0700

    [HUDI-1743] Added support for SqlFileBasedTransformer (#2747)
---
 .../transform/SqlFileBasedTransformer.java         | 106 ++++++++++++
 .../transform/TestSqlFileBasedTransformer.java     | 178 +++++++++++++++++++++
 .../sql-file-transformer-empty.sql                 |  25 +++
 .../sql-file-transformer-invalid.sql               |  22 +++
 .../delta-streamer-config/sql-file-transformer.sql |  27 ++++
 5 files changed, 358 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
new file mode 100644
index 0000000..04264bf
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.Scanner;
+import java.util.UUID;
+
+/**
+ * A transformer that allows a sql template file be used to transform the source before writing to
+ * Hudi data-set.
+ *
+ * <p>The query should reference the source as a table named "\<SRC\>"
+ *
+ * <p>The final sql statement result is used as the write payload.
+ *
+ * <p>The SQL file is configured with this hoodie property:
+ * hoodie.deltastreamer.transformer.sql.file
+ *
+ * <p>Example Spark SQL Query:
+ *
+ * <p>CACHE TABLE tmp_personal_trips AS
+ * SELECT * FROM <SRC> WHERE trip_type='personal_trips';
+ * <p>
+ * SELECT * FROM tmp_personal_trips;
+ */
+public class SqlFileBasedTransformer implements Transformer {
+
+  private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class);
+
+  private static final String SRC_PATTERN = "<SRC>";
+  private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
+
+  @Override
+  public Dataset<Row> apply(
+      final JavaSparkContext jsc,
+      final SparkSession sparkSession,
+      final Dataset<Row> rowDataset,
+      final TypedProperties props) {
+
+    final String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE);
+    if (null == sqlFile) {
+      throw new IllegalArgumentException(
+          "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")");
+    }
+
+    final FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true);
+    // tmp table name doesn't like dashes
+    final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+    LOG.info("Registering tmp table : " + tmpTable);
+    rowDataset.registerTempTable(tmpTable);
+
+    try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
+      Dataset<Row> rows = null;
+      // each sql statement is separated with semicolon hence set that as delimiter.
+      scanner.useDelimiter(";");
+      LOG.info("SQL Query for transformation : ");
+      while (scanner.hasNext()) {
+        String sqlStr = scanner.next();
+        sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim();
+        if (!sqlStr.isEmpty()) {
+          LOG.info(sqlStr);
+          // overwrite the same dataset object until the last statement then return.
+          rows = sparkSession.sql(sqlStr);
+        }
+      }
+      return rows;
+    } catch (final IOException ioe) {
+      throw new HoodieIOException("Error reading transformer SQL file.", ioe);
+    }
+  }
+
+  /** Configs supported. */
+  private static class Config {
+
+    private static final String TRANSFORMER_SQL_FILE = "hoodie.deltastreamer.transformer.sql.file";
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
new file mode 100644
index 0000000..833d7b4
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestSqlFileBasedTransformer extends UtilitiesTestBase {
+  private TypedProperties props;
+  private SqlFileBasedTransformer sqlFileTransformer;
+  private Dataset<Row> inputDatasetRows;
+  private Dataset<Row> emptyDatasetRow;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass();
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "delta-streamer-config/sql-file-transformer.sql",
+        UtilitiesTestBase.dfs,
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "delta-streamer-config/sql-file-transformer-invalid.sql",
+        UtilitiesTestBase.dfs,
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "delta-streamer-config/sql-file-transformer-empty.sql",
+        UtilitiesTestBase.dfs,
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @Override
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    props = new TypedProperties();
+    sqlFileTransformer = new SqlFileBasedTransformer();
+    inputDatasetRows = getInputDatasetRows();
+    emptyDatasetRow = getEmptyDatasetRow();
+  }
+
+  @Override
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  @Test
+  public void testSqlFileBasedTransformerIllegalArguments() {
+    // Test if the class throws illegal argument exception when argument not present.
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+  }
+
+  @Test
+  public void testSqlFileBasedTransformerIncorrectConfig() {
+    // Test if the class throws hoodie IO exception correctly when given a incorrect config.
+    props.setProperty(
+        "hoodie.deltastreamer.transformer.sql.file",
+        UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql");
+    assertThrows(
+        HoodieIOException.class,
+        () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+  }
+
+  @Test
+  public void testSqlFileBasedTransformerInvalidSQL() {
+    // Test if the SQL file based transformer works as expected for the invalid SQL statements.
+    props.setProperty(
+        "hoodie.deltastreamer.transformer.sql.file",
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+    assertThrows(
+        ParseException.class,
+        () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
+  }
+
+  @Test
+  public void testSqlFileBasedTransformerEmptyDataset() {
+    // Test if the SQL file based transformer works as expected for the empty SQL statements.
+    props.setProperty(
+        "hoodie.deltastreamer.transformer.sql.file",
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+    Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
+    String[] actualRows = emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
+    String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new String[0]);
+    assertArrayEquals(expectedRows, actualRows);
+  }
+
+  @Test
+  public void testSqlFileBasedTransformer() {
+    // Test if the SQL file based transformer works as expected for the correct input.
+    props.setProperty(
+        "hoodie.deltastreamer.transformer.sql.file",
+        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+    Dataset<Row> transformedRow =
+        sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
+
+    // Called distinct() and sort() to match the transformation in this file:
+    // hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
+    String[] expectedRows =
+        inputDatasetRows
+            .distinct()
+            .sort("col1")
+            .as(Encoders.STRING())
+            .collectAsList()
+            .toArray(new String[0]);
+    String[] actualRows = transformedRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
+    assertArrayEquals(expectedRows, actualRows);
+  }
+
+  private Dataset<Row> getInputDatasetRows() {
+    // Create few rows with duplicate data.
+    List<Row> list = new ArrayList<>();
+    list.add(RowFactory.create("one"));
+    list.add(RowFactory.create("two"));
+    list.add(RowFactory.create("three"));
+    list.add(RowFactory.create("four"));
+    list.add(RowFactory.create("four"));
+    // Create the schema struct.
+    List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
+    listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
+    StructType structType = DataTypes.createStructType(listOfStructField);
+    // Create the data frame with the rows and schema.
+    return sparkSession.createDataFrame(list, structType);
+  }
+
+  private Dataset<Row> getEmptyDatasetRow() {
+    // Create the schema struct.
+    List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
+    listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
+    StructType structType = DataTypes.createStructType(listOfStructField);
+    // Create the data frame with the rows and schema.
+    List<Row> list = new ArrayList<>();
+    // Create empty dataframe with the schema.
+    return sparkSession.createDataFrame(list, structType);
+  }
+}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql
new file mode 100644
index 0000000..ae83b47
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql
@@ -0,0 +1,25 @@
+--  Licensed to the Apache Software Foundation (ASF) under one
+--  or more contributor license agreements.  See the NOTICE file
+--  distributed with this work for additional information
+--  regarding copyright ownership.  The ASF licenses this file
+--  to you under the Apache License, Version 2.0 (the
+--  "License"), you may not use this file except in compliance
+--  with the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+--  Unless required by applicable law or agreed to in writing, software
+--  distributed under the License is distributed on an "AS IS" BASIS,
+--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+--  See the License for the specific language governing permissions and
+--  limitations under the License.
+
+CACHE
+TABLE tmp_trips
+SELECT *
+FROM <SRC>
+WHERE
+    True = False;
+
+SELECT *
+FROM tmp_trips;
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql
new file mode 100644
index 0000000..42b2e7e
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql
@@ -0,0 +1,22 @@
+--  Licensed to the Apache Software Foundation (ASF) under one
+--  or more contributor license agreements.  See the NOTICE file
+--  distributed with this work for additional information
+--  regarding copyright ownership.  The ASF licenses this file
+--  to you under the Apache License, Version 2.0 (the
+--  "License"), you may not use this file except in compliance
+--  with the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+--  Unless required by applicable law or agreed to in writing, software
+--  distributed under the License is distributed on an "AS IS" BASIS,
+--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+--  See the License for the specific language governing permissions and
+--  limitations under the License.
+
+SEECT
+DISTINCT col1
+FROM
+        <SRC>
+ORDER BY col1;
+
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
new file mode 100644
index 0000000..ce14b79
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
@@ -0,0 +1,27 @@
+
+--  Licensed to the Apache Software Foundation (ASF) under one
+--  or more contributor license agreements.  See the NOTICE file
+--  distributed with this work for additional information
+--  regarding copyright ownership.  The ASF licenses this file
+--  to you under the Apache License, Version 2.0 (the
+--  "License"), you may not use this file except in compliance
+--  with the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+--  Unless required by applicable law or agreed to in writing, software
+--  distributed under the License is distributed on an "AS IS" BASIS,
+--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+--  See the License for the specific language governing permissions and
+--  limitations under the License.
+
+SELECT RAND();
+
+SELECT 1;
+
+SELECT
+    DISTINCT col1
+FROM
+    <SRC>
+ORDER BY col1;
+