You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/10 22:03:26 UTC

[hudi] branch master updated: [HUDI-1790] Added SqlSource to fetch data from any partitions for backfill use case (#2896)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4114d  [HUDI-1790] Added SqlSource to fetch data from any partitions for backfill use case (#2896)
9e4114d is described below

commit 9e4114dd46cda4db23a24889cd267998734202da
Author: Vinoth Govindarajan <vi...@uber.com>
AuthorDate: Thu Jun 10 15:03:07 2021 -0700

    [HUDI-1790] Added SqlSource to fetch data from any partitions for backfill use case (#2896)
---
 .../apache/hudi/utilities/sources/SqlSource.java   |  94 ++++++++++
 .../hudi/utilities/sources/TestSqlSource.java      | 190 +++++++++++++++++++++
 2 files changed, 284 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
new file mode 100644
index 0000000..d832e43
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * SQL Source that reads from any table, used mainly for backfill jobs which will process specific partition dates.
+ *
+ * <p>Spark SQL should be configured using this hoodie config:
+ *
+ * <p>hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table'
+ *
+ * <p>SQL Source is used for one time backfill scenarios, this won't update the deltastreamer.checkpoint.key to the
+ * processed commit, instead it will fetch the latest successful checkpoint key and set that value as this backfill
+ * commits checkpoint so that it won't interrupt the regular incremental processing.
+ *
+ * <p>To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer jobs:
+ *
+ * <p>hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'
+ */
+public class SqlSource extends RowSource {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(SqlSource.class);
+  private final String sourceSql;
+  private final SparkSession spark;
+
+  public SqlSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    DataSourceUtils.checkRequiredProperties(
+        props, Collections.singletonList(SqlSource.Config.SOURCE_SQL));
+    sourceSql = props.getString(SqlSource.Config.SOURCE_SQL);
+    spark = sparkSession;
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+      Option<String> lastCkptStr, long sourceLimit) {
+    LOG.debug(sourceSql);
+    Dataset<Row> source = spark.sql(sourceSql);
+    LOG.debug(source.showString(10, 0, true));
+    // Remove Hoodie meta columns except partition path from input source.
+    if (Arrays.asList(source.columns()).contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
+      source =
+        source.drop(
+            HoodieRecord.HOODIE_META_COLUMNS.stream()
+                .filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+                .toArray(String[]::new));
+    }
+    return Pair.of(Option.of(source), null);
+  }
+
+  /**
+   * Configs supported.
+   */
+  private static class Config {
+
+    private static final String SOURCE_SQL = "hoodie.deltastreamer.source.sql.sql.query";
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
new file mode 100644
index 0000000..1395ae6
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.SqlSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test against {@link SqlSource}.
+ */
+public class TestSqlSource extends UtilitiesTestBase {
+
+  private final boolean useFlattenedSchema = false;
+  private final String sqlSourceConfig = "hoodie.deltastreamer.source.sql.sql.query";
+  protected FilebasedSchemaProvider schemaProvider;
+  protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+  private String dfsRoot;
+  private TypedProperties props;
+  private SqlSource sqlSource;
+  private SourceFormatAdapter sourceFormatAdapter;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass();
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @BeforeEach
+  public void setup() throws Exception {
+    dfsRoot = UtilitiesTestBase.dfsBasePath + "/parquetFiles";
+    UtilitiesTestBase.dfs.mkdirs(new Path(dfsRoot));
+    props = new TypedProperties();
+    super.setup();
+    schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
+    // Produce new data, extract new data
+    generateTestTable("1", "001", 10000);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  /**
+   * Generates a batch of test data and writes the data to a file and register a test table.
+   *
+   * @param filename    The name of the file.
+   * @param instantTime The commit time.
+   * @param n           The number of records to generate.
+   */
+  private void generateTestTable(String filename, String instantTime, int n) throws IOException {
+    Path path = new Path(dfsRoot, filename);
+    Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema)), path);
+    sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in avro format.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceAvroFormat() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    // Test fetching Avro format
+    InputBatch<JavaRDD<GenericRecord>> fetch1 =
+        sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+
+    // Test Avro to Row format
+    Dataset<Row> fetch1Rows = AvroConversionUtils
+        .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
+            schemaProvider.getSourceSchema().toString(), sparkSession);
+    assertEquals(10000, fetch1Rows.count());
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in row format.
+   * Source has less records than source limit.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceRowFormat() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    // Test fetching Row format
+    InputBatch<Dataset<Row>> fetch1AsRows =
+        sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(10000, fetch1AsRows.getBatch().get().count());
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in row format.
+   * Source has more records than source limit.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceMoreRecordsThanSourceLimit() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    InputBatch<Dataset<Row>> fetch1AsRows =
+        sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 1000);
+    assertEquals(10000, fetch1AsRows.getBatch().get().count());
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in row format.
+   * Source has no records.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceZeroRecord() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    InputBatch<Dataset<Row>> fetch1AsRows =
+        sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(0, fetch1AsRows.getBatch().get().count());
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in row format.
+   * Source table doesn't exists.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceInvalidTable() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from not_exist_sql_table");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    assertThrows(
+        AnalysisException.class,
+        () -> sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE));
+  }
+}