You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2021/09/29 15:54:28 UTC
[hudi] branch master updated: [HUDI-2277] HoodieDeltaStreamer
reading ORC files directly using ORCDFSSource (#3413)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dd1bd62 [HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)
dd1bd62 is described below
commit dd1bd62684fdd2115dcc4bad264493d25cc97858
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Wed Sep 29 23:54:12 2021 +0800
[HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)
* add ORCDFSSource to support reading orc file into hudi format && add UTs
* remove ununsed import
* simplify tes
* code review
* code review
* code review
* code review
* code review
* code review
Co-authored-by: yuezhang <yu...@freewheel.tv>
---
.../common/testutils/HoodieTestDataGenerator.java | 4 ++
.../hudi/utilities/sources/ORCDFSSource.java | 56 ++++++++++++++++++++++
.../functional/TestHoodieDeltaStreamer.java | 44 +++++++++++++++++
.../functional/TestHoodieDeltaStreamerBase.java | 29 +++++++++++
.../utilities/testutils/UtilitiesTestBase.java | 43 +++++++++++++++++
5 files changed, 176 insertions(+)
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 68d1f2d..86ea1f0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.io.Serializable;
@@ -129,10 +131,12 @@ public class HoodieTestDataGenerator {
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+ public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA);
public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA);
+ public static final TypeDescription ORC_TRIP_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_SCHEMA));
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
new file mode 100644
index 0000000..942bae8
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DFS Source that reads ORC data.
+ */
+public class ORCDFSSource extends RowSource {
+
+ private final DFSPathSelector pathSelector;
+
+ public ORCDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+ Pair<Option<String>, String> selectPathsWithMaxModificationTime =
+ pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
+ return selectPathsWithMaxModificationTime.getLeft()
+ .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
+ .orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
+ }
+
+ private Dataset<Row> fromFiles(String pathStr) {
+ return sparkSession.read().orc(pathStr.split(","));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index fe1a151..6313ab7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -57,6 +57,7 @@ import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
+import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
@@ -122,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
@@ -1482,6 +1484,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
testNum++;
}
+ private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+ // prepare ORCDFSSource
+ TypedProperties orcProps = new TypedProperties();
+
+ // Properties used for testing delta-streamer with orc source
+ orcProps.setProperty("include", "base.properties");
+ orcProps.setProperty("hoodie.embed.timeline.server","false");
+ orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+ orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+ if (useSchemaProvider) {
+ orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + "source.avsc");
+ if (transformerClassNames != null) {
+ orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + "target.avsc");
+ }
+ }
+ orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
+ UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_ORC);
+
+ String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
+ transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+ testNum++;
+ }
+
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
@@ -1622,6 +1652,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
+ @ParameterizedTest
+ @MethodSource("testORCDFSSource")
+ public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+ testORCDFSSource(useSchemaProvider, transformerClassNames);
+ }
+
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
String sourceRoot = dfsBasePath + "/csvFiles";
@@ -1936,4 +1972,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
}
}
+ private static Stream<Arguments> testORCDFSSource() {
+ // arg1 boolean useSchemaProvider, arg2 List<String> transformerClassNames
+ return Stream.of(
+ arguments(false, null),
+ arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
+ );
+ }
+
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
index 5a1cfc3..51b51d8 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
@@ -50,12 +50,16 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+ static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
+ static final String FIRST_ORC_FILE_NAME = "1.orc";
static String PARQUET_SOURCE_ROOT;
+ static String ORC_SOURCE_ROOT;
static String JSON_KAFKA_SOURCE_ROOT;
static final int PARQUET_NUM_RECORDS = 5;
+ static final int ORC_NUM_RECORDS = 5;
static final int CSV_NUM_RECORDS = 3;
static final int JSON_KAFKA_NUM_RECORDS = 5;
String kafkaCheckpointType = "string";
@@ -84,6 +88,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+ ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
@@ -147,6 +152,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+ prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
protected static void writeCommonPropsToFile() throws IOException {
@@ -247,4 +253,27 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}
+
+ protected static void prepareORCDFSFiles(int numRecords) throws IOException {
+ prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT);
+ }
+
+ protected static void prepareORCDFSFiles(int numRecords, String baseORCPath) throws IOException {
+ prepareORCDFSFiles(numRecords, baseORCPath, FIRST_ORC_FILE_NAME, false, null, null);
+ }
+
+ protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, String fileName, boolean useCustomSchema,
+ String schemaStr, Schema schema) throws IOException {
+ String path = baseORCPath + "/" + fileName;
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ if (useCustomSchema) {
+ Helpers.saveORCToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
+ schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
+ } else {
+ Helpers.saveORCToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInserts("000", numRecords)), new Path(path));
+ }
+ }
+
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 8bff475..bb00d2f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
+import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -57,6 +58,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -314,6 +320,27 @@ public class UtilitiesTestBase {
}
}
+ public static void saveORCToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
+ saveORCToDFS(records, targetFile, HoodieTestDataGenerator.ORC_SCHEMA);
+ }
+
+ public static void saveORCToDFS(List<GenericRecord> records, Path targetFile, TypeDescription schema) throws IOException {
+ OrcFile.WriterOptions options = OrcFile.writerOptions(HoodieTestUtils.getDefaultHadoopConf()).setSchema(schema);
+ try (Writer writer = OrcFile.createWriter(targetFile, options)) {
+ VectorizedRowBatch batch = schema.createRowBatch();
+ for (GenericRecord record : records) {
+ addAvroRecord(batch, record, schema);
+ batch.size++;
+ if (batch.size % records.size() == 0 || batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ batch.size = 0;
+ }
+ }
+ writer.addRowBatch(batch);
+ }
+ }
+
public static TypedProperties setupSchemaOnDFS() throws IOException {
return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
}
@@ -364,5 +391,21 @@ public class UtilitiesTestBase {
public static String[] jsonifyRecords(List<HoodieRecord> records) {
return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
}
+
+ private static void addAvroRecord(
+ VectorizedRowBatch batch,
+ GenericRecord record,
+ TypeDescription orcSchema
+ ) {
+ for (int c = 0; c < batch.numCols; c++) {
+ ColumnVector colVector = batch.cols[c];
+ final String thisField = orcSchema.getFieldNames().get(c);
+ final TypeDescription type = orcSchema.getChildren().get(c);
+
+ Object fieldValue = record.get(thisField);
+ Schema.Field avroField = record.getSchema().getField(thisField);
+ AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
+ }
+ }
}
}