You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/10 22:03:26 UTC
[hudi] branch master updated: [HUDI-1790] Added SqlSource to fetch
data from any partitions for backfill use case (#2896)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e4114d [HUDI-1790] Added SqlSource to fetch data from any partitions for backfill use case (#2896)
9e4114d is described below
commit 9e4114dd46cda4db23a24889cd267998734202da
Author: Vinoth Govindarajan <vi...@uber.com>
AuthorDate: Thu Jun 10 15:03:07 2021 -0700
[HUDI-1790] Added SqlSource to fetch data from any partitions for backfill use case (#2896)
---
.../apache/hudi/utilities/sources/SqlSource.java | 94 ++++++++++
.../hudi/utilities/sources/TestSqlSource.java | 190 +++++++++++++++++++++
2 files changed, 284 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
new file mode 100644
index 0000000..d832e43
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * SQL Source that reads from any table, used mainly for backfill jobs which will process specific partition dates.
+ *
+ * <p>Spark SQL should be configured using this hoodie config:
+ *
+ * <p>hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table'
+ *
+ * <p>SQL Source is used for one time backfill scenarios, this won't update the deltastreamer.checkpoint.key to the
+ * processed commit, instead it will fetch the latest successful checkpoint key and set that value as this backfill
+ * commits checkpoint so that it won't interrupt the regular incremental processing.
+ *
+ * <p>To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer jobs:
+ *
+ * <p>hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'
+ */
+public class SqlSource extends RowSource {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LogManager.getLogger(SqlSource.class);
+ private final String sourceSql;
+ private final SparkSession spark;
+
+ public SqlSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ DataSourceUtils.checkRequiredProperties(
+ props, Collections.singletonList(SqlSource.Config.SOURCE_SQL));
+ sourceSql = props.getString(SqlSource.Config.SOURCE_SQL);
+ spark = sparkSession;
+ }
+
+ @Override
+ protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+ Option<String> lastCkptStr, long sourceLimit) {
+ LOG.debug(sourceSql);
+ Dataset<Row> source = spark.sql(sourceSql);
+ LOG.debug(source.showString(10, 0, true));
+ // Remove Hoodie meta columns except partition path from input source.
+ if (Arrays.asList(source.columns()).contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
+ source =
+ source.drop(
+ HoodieRecord.HOODIE_META_COLUMNS.stream()
+ .filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+ .toArray(String[]::new));
+ }
+ return Pair.of(Option.of(source), null);
+ }
+
+ /**
+ * Configs supported.
+ */
+ private static class Config {
+
+ private static final String SOURCE_SQL = "hoodie.deltastreamer.source.sql.sql.query";
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
new file mode 100644
index 0000000..1395ae6
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.SqlSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test against {@link SqlSource}.
+ */
+public class TestSqlSource extends UtilitiesTestBase {
+
+ private final boolean useFlattenedSchema = false;
+ private final String sqlSourceConfig = "hoodie.deltastreamer.source.sql.sql.query";
+ protected FilebasedSchemaProvider schemaProvider;
+ protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ private String dfsRoot;
+ private TypedProperties props;
+ private SqlSource sqlSource;
+ private SourceFormatAdapter sourceFormatAdapter;
+
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass();
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ dfsRoot = UtilitiesTestBase.dfsBasePath + "/parquetFiles";
+ UtilitiesTestBase.dfs.mkdirs(new Path(dfsRoot));
+ props = new TypedProperties();
+ super.setup();
+ schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
+ // Produce new data, extract new data
+ generateTestTable("1", "001", 10000);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ /**
+ * Generates a batch of test data and writes the data to a file and register a test table.
+ *
+ * @param filename The name of the file.
+ * @param instantTime The commit time.
+ * @param n The number of records to generate.
+ */
+ private void generateTestTable(String filename, String instantTime, int n) throws IOException {
+ Path path = new Path(dfsRoot, filename);
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema)), path);
+ sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source in avro format.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceAvroFormat() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ // Test fetching Avro format
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
+ sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+
+ // Test Avro to Row format
+ Dataset<Row> fetch1Rows = AvroConversionUtils
+ .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
+ schemaProvider.getSourceSchema().toString(), sparkSession);
+ assertEquals(10000, fetch1Rows.count());
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source has less records than source limit.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceRowFormat() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ // Test fetching Row format
+ InputBatch<Dataset<Row>> fetch1AsRows =
+ sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(10000, fetch1AsRows.getBatch().get().count());
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source has more records than source limit.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceMoreRecordsThanSourceLimit() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from test_sql_table");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ InputBatch<Dataset<Row>> fetch1AsRows =
+ sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 1000);
+ assertEquals(10000, fetch1AsRows.getBatch().get().count());
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source has no records.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceZeroRecord() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ InputBatch<Dataset<Row>> fetch1AsRows =
+ sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(0, fetch1AsRows.getBatch().get().count());
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source table doesn't exists.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceInvalidTable() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from not_exist_sql_table");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ assertThrows(
+ AnalysisException.class,
+ () -> sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE));
+ }
+}