You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2021/09/29 15:54:28 UTC

[hudi] branch master updated: [HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1bd62  [HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)
dd1bd62 is described below

commit dd1bd62684fdd2115dcc4bad264493d25cc97858
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Wed Sep 29 23:54:12 2021 +0800

    [HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)
    
    * add ORCDFSSource to support reading orc file into hudi format && add UTs
    
    * remove ununsed import
    
    * simplify tes
    
    * code review
    
    * code review
    
    * code review
    
    * code review
    
    * code review
    
    * code review
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
---
 .../common/testutils/HoodieTestDataGenerator.java  |  4 ++
 .../hudi/utilities/sources/ORCDFSSource.java       | 56 ++++++++++++++++++++++
 .../functional/TestHoodieDeltaStreamer.java        | 44 +++++++++++++++++
 .../functional/TestHoodieDeltaStreamerBase.java    | 29 +++++++++++
 .../utilities/testutils/UtilitiesTestBase.java     | 43 +++++++++++++++++
 5 files changed, 176 insertions(+)

diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 68d1f2d..86ea1f0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.orc.TypeDescription;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -129,10 +131,12 @@ public class HoodieTestDataGenerator {
 
 
   public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+  public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
   public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA);
   public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA);
+  public static final TypeDescription ORC_TRIP_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_SCHEMA));
   public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
 
   private static final Random RAND = new Random(46474747);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
new file mode 100644
index 0000000..942bae8
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DFS Source that reads ORC data.
+ */
+public class ORCDFSSource extends RowSource {
+
+  private final DFSPathSelector pathSelector;
+
+  public ORCDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    Pair<Option<String>, String> selectPathsWithMaxModificationTime =
+        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
+    return selectPathsWithMaxModificationTime.getLeft()
+        .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
+        .orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
+  }
+
+  private Dataset<Row> fromFiles(String pathStr) {
+    return sparkSession.read().orc(pathStr.split(","));
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index fe1a151..6313ab7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -57,6 +57,7 @@ import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.JdbcSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
+import org.apache.hudi.utilities.sources.ORCDFSSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.JdbcTestUtils;
@@ -122,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
@@ -1482,6 +1484,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     testNum++;
   }
 
+  private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+    // prepare ORCDFSSource
+    TypedProperties orcProps = new TypedProperties();
+
+    // Properties used for testing delta-streamer with orc source
+    orcProps.setProperty("include", "base.properties");
+    orcProps.setProperty("hoodie.embed.timeline.server","false");
+    orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    if (useSchemaProvider) {
+      orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + "source.avsc");
+      if (transformerClassNames != null) {
+        orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + "target.avsc");
+      }
+    }
+    orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
+    UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_ORC);
+
+    String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+            TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
+                    transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
+                    useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+    testNum++;
+  }
+
   private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
     // Properties used for testing delta-streamer with JsonKafka source
     TypedProperties props = new TypedProperties();
@@ -1622,6 +1652,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @ParameterizedTest
+  @MethodSource("testORCDFSSource")
+  public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+    testORCDFSSource(useSchemaProvider, transformerClassNames);
+  }
+
   private void prepareCsvDFSSource(
       boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
     String sourceRoot = dfsBasePath + "/csvFiles";
@@ -1936,4 +1972,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     }
   }
 
+  private static Stream<Arguments> testORCDFSSource() {
+    // arg1 boolean useSchemaProvider, arg2 List<String> transformerClassNames
+    return Stream.of(
+            arguments(false, null),
+            arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
+    );
+  }
+
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
index 5a1cfc3..51b51d8 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
@@ -50,12 +50,16 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
   static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
   static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
   static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+  static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
   static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
   static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
   static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
+  static final String FIRST_ORC_FILE_NAME = "1.orc";
   static String PARQUET_SOURCE_ROOT;
+  static String ORC_SOURCE_ROOT;
   static String JSON_KAFKA_SOURCE_ROOT;
   static final int PARQUET_NUM_RECORDS = 5;
+  static final int ORC_NUM_RECORDS = 5;
   static final int CSV_NUM_RECORDS = 3;
   static final int JSON_KAFKA_NUM_RECORDS = 5;
   String kafkaCheckpointType = "string";
@@ -84,6 +88,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
   public static void initClass() throws Exception {
     UtilitiesTestBase.initClass(true);
     PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+    ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
     JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
     testUtils = new KafkaTestUtils();
     testUtils.setup();
@@ -147,6 +152,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
 
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+    prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
   }
 
   protected static void writeCommonPropsToFile() throws IOException {
@@ -247,4 +253,27 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
           dataGenerator.generateInserts("000", numRecords)), new Path(path));
     }
   }
+
+  protected static void prepareORCDFSFiles(int numRecords) throws IOException {
+    prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT);
+  }
+
+  protected static void prepareORCDFSFiles(int numRecords, String baseORCPath) throws IOException {
+    prepareORCDFSFiles(numRecords, baseORCPath, FIRST_ORC_FILE_NAME, false, null, null);
+  }
+
+  protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, String fileName, boolean useCustomSchema,
+                                               String schemaStr, Schema schema) throws IOException {
+    String path = baseORCPath + "/" + fileName;
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    if (useCustomSchema) {
+      Helpers.saveORCToDFS(Helpers.toGenericRecords(
+              dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
+              schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
+    } else {
+      Helpers.saveORCToDFS(Helpers.toGenericRecords(
+              dataGenerator.generateInserts("000", numRecords)), new Path(path));
+    }
+  }
+
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 8bff475..bb00d2f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
+import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
@@ -57,6 +58,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -314,6 +320,27 @@ public class UtilitiesTestBase {
       }
     }
 
+    public static void saveORCToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
+      saveORCToDFS(records, targetFile, HoodieTestDataGenerator.ORC_SCHEMA);
+    }
+
+    public static void saveORCToDFS(List<GenericRecord> records, Path targetFile, TypeDescription schema) throws IOException {
+      OrcFile.WriterOptions options = OrcFile.writerOptions(HoodieTestUtils.getDefaultHadoopConf()).setSchema(schema);
+      try (Writer writer = OrcFile.createWriter(targetFile, options)) {
+        VectorizedRowBatch batch = schema.createRowBatch();
+        for (GenericRecord record : records) {
+          addAvroRecord(batch, record, schema);
+          batch.size++;
+          if (batch.size % records.size() == 0 || batch.size == batch.getMaxSize()) {
+            writer.addRowBatch(batch);
+            batch.reset();
+            batch.size = 0;
+          }
+        }
+        writer.addRowBatch(batch);
+      }
+    }
+
     public static TypedProperties setupSchemaOnDFS() throws IOException {
       return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
     }
@@ -364,5 +391,21 @@ public class UtilitiesTestBase {
     public static String[] jsonifyRecords(List<HoodieRecord> records) {
       return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
     }
+
+    private static void addAvroRecord(
+            VectorizedRowBatch batch,
+            GenericRecord record,
+            TypeDescription orcSchema
+    ) {
+      for (int c = 0; c < batch.numCols; c++) {
+        ColumnVector colVector = batch.cols[c];
+        final String thisField = orcSchema.getFieldNames().get(c);
+        final TypeDescription type = orcSchema.getChildren().get(c);
+
+        Object fieldValue = record.get(thisField);
+        Schema.Field avroField = record.getSchema().getField(thisField);
+        AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
+      }
+    }
   }
 }