You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pw...@apache.org on 2021/06/15 22:24:38 UTC
[hudi] branch master updated: [HUDI-764] [HUDI-765] ORC reader
writer Implementation (#2999)
This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b8fe5b9 [HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999)
b8fe5b9 is described below
commit b8fe5b91d599418cd908d833fd63edc7f362c548
Author: Jintao Guan <ji...@uber.com>
AuthorDate: Tue Jun 15 15:21:43 2021 -0700
[HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999)
Co-authored-by: Qingyun (Teresa) Kang <kt...@uber.com>
---
LICENSE | 12 +
NOTICE | 12 +
.../apache/hudi/config/HoodieStorageConfig.java | 42 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 17 +
.../apache/hudi/io/storage/HoodieFileWriter.java | 10 +
.../hudi/io/storage/HoodieFileWriterFactory.java | 13 +
.../apache/hudi/io/storage/HoodieHFileWriter.java | 10 +-
.../apache/hudi/io/storage/HoodieOrcConfig.java | 72 ++
.../apache/hudi/io/storage/HoodieOrcWriter.java | 172 +++++
.../hudi/io/storage/HoodieParquetWriter.java | 9 +-
.../java/org/apache/hudi/table/HoodieTable.java | 1 +
.../hudi/io/storage/TestHoodieOrcReaderWriter.java | 261 +++++++
.../src/test/resources/exampleSchemaWithUDT.avsc | 67 ++
.../io/storage/TestHoodieFileWriterFactory.java | 7 +
hudi-common/pom.xml | 8 +
.../apache/hudi/common/model/HoodieFileFormat.java | 3 +-
.../org/apache/hudi/common/util/AvroOrcUtils.java | 799 +++++++++++++++++++++
.../org/apache/hudi/common/util/BaseFileUtils.java | 133 +++-
.../apache/hudi/common/util/OrcReaderIterator.java | 118 +++
.../java/org/apache/hudi/common/util/OrcUtils.java | 235 ++++++
.../org/apache/hudi/common/util/ParquetUtils.java | 60 +-
.../hudi/io/storage/HoodieFileReaderFactory.java | 8 +
.../apache/hudi/io/storage/HoodieOrcReader.java | 91 +++
.../apache/hudi/common/util/TestAvroOrcUtils.java | 76 ++
.../hudi/common/util/TestOrcReaderIterator.java | 92 +++
.../io/storage/TestHoodieFileReaderFactory.java | 7 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 9 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 13 +-
pom.xml | 2 +
29 files changed, 2268 insertions(+), 91 deletions(-)
diff --git a/LICENSE b/LICENSE
index 385191d..28222a7 100644
--- a/LICENSE
+++ b/LICENSE
@@ -333,3 +333,15 @@ Copyright (c) 2005, European Commission project OneLab under contract 034819 (ht
Home page: https://commons.apache.org/proper/commons-lang/
License: http://www.apache.org/licenses/LICENSE-2.0
+
+ -------------------------------------------------------------------------------
+
+ This product includes code from StreamSets Data Collector
+
+ * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils
+ * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils
+
+ Copyright 2018 StreamSets Inc.
+
+ Home page: https://github.com/streamsets/datacollector-oss
+ License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/NOTICE b/NOTICE
index 2f1aee6..9b24933 100644
--- a/NOTICE
+++ b/NOTICE
@@ -147,3 +147,15 @@ its NOTICE file:
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from StreamSets Data Collector, which includes the following in
+its NOTICE file:
+
+ StreamSets datacollector-oss
+ Copyright 2018 StreamSets Inc.
+
+ This product includes software developed at
+ StreamSets (http://www.streamsets.com/).
+
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
index 50b45f3..3cd8817 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
@@ -39,10 +39,21 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
+
public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size";
public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size";
public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
+
+ public static final String ORC_FILE_MAX_BYTES = "hoodie.orc.max.file.size";
+ public static final String DEFAULT_ORC_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
+ // size of the memory buffer in bytes for writing
+ public static final String ORC_STRIPE_SIZE = "hoodie.orc.stripe.size";
+ public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(64 * 1024 * 1024);
+ // file system block size
+ public static final String ORC_BLOCK_SIZE = "hoodie.orc.block.size";
+ public static final String DEFAULT_ORC_BLOCK_SIZE = DEFAULT_ORC_FILE_MAX_BYTES;
+
// used to size log files
public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB
@@ -54,9 +65,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm";
+ public static final String ORC_COMPRESSION_CODEC = "hoodie.orc.compression.codec";
// Default compression codec for parquet
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
+ public static final String DEFAULT_ORC_COMPRESSION_CODEC = "ZLIB";
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
// Default compression ratio for log file to parquet, general 3x
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
@@ -140,6 +153,26 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder orcMaxFileSize(long maxFileSize) {
+ props.setProperty(ORC_FILE_MAX_BYTES, String.valueOf(maxFileSize));
+ return this;
+ }
+
+ public Builder orcStripeSize(int orcStripeSize) {
+ props.setProperty(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
+ return this;
+ }
+
+ public Builder orcBlockSize(int orcBlockSize) {
+ props.setProperty(ORC_BLOCK_SIZE, String.valueOf(orcBlockSize));
+ return this;
+ }
+
+ public Builder orcCompressionCodec(String orcCompressionCodec) {
+ props.setProperty(ORC_COMPRESSION_CODEC, orcCompressionCodec);
+ return this;
+ }
+
public HoodieStorageConfig build() {
HoodieStorageConfig config = new HoodieStorageConfig(props);
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), PARQUET_FILE_MAX_BYTES,
@@ -166,6 +199,15 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES,
DEFAULT_HFILE_FILE_MAX_BYTES);
+ setDefaultOnCondition(props, !props.containsKey(ORC_FILE_MAX_BYTES), ORC_FILE_MAX_BYTES,
+ DEFAULT_ORC_FILE_MAX_BYTES);
+ setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), ORC_STRIPE_SIZE,
+ DEFAULT_ORC_STRIPE_SIZE);
+ setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), ORC_BLOCK_SIZE,
+ DEFAULT_ORC_BLOCK_SIZE);
+ setDefaultOnCondition(props, !props.containsKey(ORC_COMPRESSION_CODEC), ORC_COMPRESSION_CODEC,
+ DEFAULT_ORC_COMPRESSION_CODEC);
+
return config;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cf5ac5c..9e89e0e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,6 +42,7 @@ import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import javax.annotation.concurrent.Immutable;
@@ -784,6 +785,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
}
+ public long getOrcMaxFileSize() {
+ return Long.parseLong(props.getProperty(HoodieStorageConfig.ORC_FILE_MAX_BYTES));
+ }
+
+ public int getOrcStripeSize() {
+ return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_STRIPE_SIZE));
+ }
+
+ public int getOrcBlockSize() {
+ return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_BLOCK_SIZE));
+ }
+
+ public CompressionKind getOrcCompressionCodec() {
+ return CompressionKind.valueOf(props.getProperty(HoodieStorageConfig.ORC_COMPRESSION_CODEC));
+ }
+
/**
* metrics properties.
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index 1aaa389..a579234 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -18,6 +18,9 @@
package org.apache.hudi.io.storage;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -35,4 +38,11 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
void writeAvro(String key, R oldRecord) throws IOException;
long getBytesWritten();
+
+ default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
+ String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
+ HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
+ HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
+ return;
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 23701b0..96f19ca 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,6 +34,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import java.io.IOException;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
@@ -49,6 +50,9 @@ public class HoodieFileWriterFactory {
if (HFILE.getFileExtension().equals(extension)) {
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
}
+ if (ORC.getFileExtension().equals(extension)) {
+ return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
+ }
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
@@ -77,6 +81,15 @@ public class HoodieFileWriterFactory {
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier);
}
+ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
+ String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
+ TaskContextSupplier taskContextSupplier) throws IOException {
+ BloomFilter filter = createBloomFilter(config);
+ HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
+ config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
+ return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
+ }
+
private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index 352c51c..6747c4a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -99,13 +99,9 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
- String seqId =
- HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
- HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
- file.getName());
- HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
-
- writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
+ prepRecordWithMetadata(avroRecord, record, instantTime,
+ taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
+ writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
new file mode 100644
index 0000000..c45e024
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.orc.CompressionKind;
+
+public class HoodieOrcConfig {
+ static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
+
+ private final CompressionKind compressionKind;
+ private final int stripeSize;
+ private final int blockSize;
+ private final long maxFileSize;
+ private final Configuration hadoopConf;
+ private final BloomFilter bloomFilter;
+
+ public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize,
+ int blockSize, long maxFileSize, BloomFilter bloomFilter) {
+ this.hadoopConf = hadoopConf;
+ this.compressionKind = compressionKind;
+ this.stripeSize = stripeSize;
+ this.blockSize = blockSize;
+ this.maxFileSize = maxFileSize;
+ this.bloomFilter = bloomFilter;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public CompressionKind getCompressionKind() {
+ return compressionKind;
+ }
+
+ public int getStripeSize() {
+ return stripeSize;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ public long getMaxFileSize() {
+ return maxFileSize;
+ }
+
+ public boolean useBloomFilter() {
+ return bloomFilter != null;
+ }
+
+ public BloomFilter getBloomFilter() {
+ return bloomFilter;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
new file mode 100644
index 0000000..f076842
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.AvroOrcUtils;
+
+public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
+ implements HoodieFileWriter<R> {
+ private static final AtomicLong RECORD_INDEX = new AtomicLong(1);
+
+ private final long maxFileSize;
+ private final Schema avroSchema;
+ private final List<TypeDescription> fieldTypes;
+ private final List<String> fieldNames;
+ private final VectorizedRowBatch batch;
+ private final Writer writer;
+
+ private final Path file;
+ private final HoodieWrapperFileSystem fs;
+ private final String instantTime;
+ private final TaskContextSupplier taskContextSupplier;
+
+ private HoodieOrcConfig orcConfig;
+ private String minRecordKey;
+ private String maxRecordKey;
+
+ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Schema schema,
+ TaskContextSupplier taskContextSupplier) throws IOException {
+
+ Configuration conf = FSUtils.registerFileSystem(file, config.getHadoopConf());
+ this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
+ this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+ this.instantTime = instantTime;
+ this.taskContextSupplier = taskContextSupplier;
+
+ this.avroSchema = schema;
+ final TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+ this.fieldTypes = orcSchema.getChildren();
+ this.fieldNames = orcSchema.getFieldNames();
+ this.maxFileSize = config.getMaxFileSize();
+ this.batch = orcSchema.createRowBatch();
+ OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
+ .blockSize(config.getBlockSize())
+ .stripeSize(config.getStripeSize())
+ .compress(config.getCompressionKind())
+ .bufferSize(config.getBlockSize())
+ .fileSystem(fs)
+ .setSchema(orcSchema);
+ this.writer = OrcFile.createWriter(this.file, writerOptions);
+ this.orcConfig = config;
+ }
+
+ @Override
+ public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
+ prepRecordWithMetadata(avroRecord, record, instantTime,
+ taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
+ writeAvro(record.getRecordKey(), avroRecord);
+ }
+
+ @Override
+ public boolean canWrite() {
+ return fs.getBytesWritten(file) < maxFileSize;
+ }
+
+ @Override
+ public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
+ for (int col = 0; col < batch.numCols; col++) {
+ ColumnVector colVector = batch.cols[col];
+ final String thisField = fieldNames.get(col);
+ final TypeDescription type = fieldTypes.get(col);
+
+ Object fieldValue = ((GenericRecord) object).get(thisField);
+ Schema.Field avroField = avroSchema.getField(thisField);
+ AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
+ }
+
+ batch.size++;
+
+ // Batch size corresponds to the number of written rows out of 1024 total rows (by default)
+ // in the row batch, add the batch to file once all rows are filled and reset.
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ batch.size = 0;
+ }
+
+ if (orcConfig.useBloomFilter()) {
+ orcConfig.getBloomFilter().add(recordKey);
+ if (minRecordKey != null) {
+ minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
+ } else {
+ minRecordKey = recordKey;
+ }
+
+ if (maxRecordKey != null) {
+ maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
+ } else {
+ maxRecordKey = recordKey;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+
+ if (orcConfig.useBloomFilter()) {
+ final BloomFilter bloomFilter = orcConfig.getBloomFilter();
+ writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
+ if (minRecordKey != null && maxRecordKey != null) {
+ writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
+ writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
+ }
+ if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+ writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
+ }
+ }
+ writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
+
+ writer.close();
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return fs.getBytesWritten(file);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index c3939d7..b6e77bc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -18,7 +18,6 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -75,11 +73,8 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
- String seqId =
- HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
- HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
- file.getName());
- HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
+ prepRecordWithMetadata(avroRecord, record, instantTime,
+ taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
super.write(avroRecord);
writeSupport.add(record.getRecordKey());
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 512518c..0ff2093 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -656,6 +656,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public HoodieLogBlockType getLogDataBlockFormat() {
switch (getBaseFileFormat()) {
case PARQUET:
+ case ORC:
return HoodieLogBlockType.AVRO_DATA_BLOCK;
case HFILE:
return HoodieLogBlockType.HFILE_DATA_BLOCK;
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
new file mode 100644
index 0000000..d69bc70
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieOrcReaderWriter {
+ private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc");
+
+ @BeforeEach
+ @AfterEach
+ public void clearTempFile() {
+ File file = new File(filePath.toString());
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception {
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
+ Configuration conf = new Configuration();
+ int orcStripSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_STRIPE_SIZE);
+ int orcBlockSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_BLOCK_SIZE);
+ int maxFileSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_FILE_MAX_BYTES);
+ HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter);
+ TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
+ String instantTime = "000";
+ return new HoodieOrcWriter(instantTime, filePath, config, avroSchema, mockTaskContextSupplier);
+ }
+
+ @Test
+ public void testWriteReadMetadata() throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
+ HoodieOrcWriter writer = createOrcWriter(avroSchema);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("_row_key", "key" + i);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ writer.writeAvro("key" + i, record);
+ }
+ writer.close();
+
+ Configuration conf = new Configuration();
+ Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+ assertEquals(4, orcReader.getMetadataKeys().size());
+ assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
+ assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
+ assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
+ assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
+ assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
+
+ HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+ BloomFilter filter = hoodieReader.readBloomFilter();
+ for (int i = 0; i < 3; i++) {
+ assertTrue(filter.mightContain("key" + i));
+ }
+ assertFalse(filter.mightContain("non-existent-key"));
+ assertEquals(3, hoodieReader.getTotalRecords());
+ String[] minMaxRecordKeys = hoodieReader.readMinMaxRecordKeys();
+ assertEquals(2, minMaxRecordKeys.length);
+ assertEquals("key0", minMaxRecordKeys[0]);
+ assertEquals("key2", minMaxRecordKeys[1]);
+ }
+
+ @Test
+ public void testWriteReadPrimitiveRecord() throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
+ HoodieOrcWriter writer = createOrcWriter(avroSchema);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("_row_key", "key" + i);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ writer.writeAvro("key" + i, record);
+ }
+ writer.close();
+
+ Configuration conf = new Configuration();
+ Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+ assertEquals("struct<_row_key:string,time:string,number:int>", orcReader.getSchema().toString());
+ assertEquals(3, orcReader.getNumberOfRows());
+
+ HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+ Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
+ int index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(index, record.get("number"));
+ index++;
+ }
+ }
+
+ @Test
+ public void testWriteReadComplexRecord() throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithUDT.avsc");
+ Schema udtSchema = avroSchema.getField("driver").schema().getTypes().get(1);
+ HoodieOrcWriter writer = createOrcWriter(avroSchema);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("_row_key", "key" + i);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ GenericRecord innerRecord = new GenericData.Record(udtSchema);
+ innerRecord.put("driver_name", "driver" + i);
+ innerRecord.put("list", Collections.singletonList(i));
+ innerRecord.put("map", Collections.singletonMap("key" + i, "value" + i));
+ record.put("driver", innerRecord);
+ writer.writeAvro("key" + i, record);
+ }
+ writer.close();
+
+ Configuration conf = new Configuration();
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+ assertEquals("struct<_row_key:string,time:string,number:int,driver:struct<driver_name:string,list:array<int>,map:map<string,string>>>",
+ reader.getSchema().toString());
+ assertEquals(3, reader.getNumberOfRows());
+
+ HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+ Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
+ int index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(index, record.get("number"));
+ GenericRecord innerRecord = (GenericRecord) record.get("driver");
+ assertEquals("driver" + index, innerRecord.get("driver_name").toString());
+ assertEquals(1, ((List<?>)innerRecord.get("list")).size());
+ assertEquals(index, ((List<?>)innerRecord.get("list")).get(0));
+ assertEquals("value" + index, ((Map<?,?>)innerRecord.get("map")).get("key" + index).toString());
+ index++;
+ }
+ }
+
+ @Test
+ public void testWriteReadWithEvolvedSchema() throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
+ HoodieOrcWriter writer = createOrcWriter(avroSchema);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("_row_key", "key" + i);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ writer.writeAvro("key" + i, record);
+ }
+ writer.close();
+
+ Configuration conf = new Configuration();
+ HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+ Schema evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchema.avsc");
+ Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(evolvedSchema);
+ int index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(index, record.get("number"));
+ assertNull(record.get("added_field"));
+ index++;
+ }
+
+ evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaChangeOrder.avsc");
+ iter = hoodieReader.getRecordIterator(evolvedSchema);
+ index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(index, record.get("number"));
+ assertNull(record.get("added_field"));
+ index++;
+ }
+
+ evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnRequire.avsc");
+ iter = hoodieReader.getRecordIterator(evolvedSchema);
+ index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(index, record.get("number"));
+ assertNull(record.get("added_field"));
+ index++;
+ }
+
+ evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnType.avsc");
+ iter = hoodieReader.getRecordIterator(evolvedSchema);
+ index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertEquals(Integer.toString(index), record.get("number").toString());
+ assertNull(record.get("added_field"));
+ index++;
+ }
+
+ evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaDeleteColumn.avsc");
+ iter = hoodieReader.getRecordIterator(evolvedSchema);
+ index = 0;
+ while (iter.hasNext()) {
+ GenericRecord record = iter.next();
+ assertEquals("key" + index, record.get("_row_key").toString());
+ assertEquals(Integer.toString(index), record.get("time").toString());
+ assertNull(record.get("number"));
+ assertNull(record.get("added_field"));
+ index++;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc
new file mode 100644
index 0000000..4c40fb2
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "example.schema",
+ "type": "record",
+ "name": "trip",
+ "fields": [
+ {
+ "name": "_row_key",
+ "type": "string"
+ },
+ {
+ "name": "time",
+ "type": "string"
+ },
+ {
+ "name": "number",
+ "type": ["null", "int"]
+ },
+ {
+ "name": "driver",
+ "type": [
+ "null",
+ {
+ "name": "person",
+ "type": "record",
+ "fields": [
+ {
+ "default": null,
+ "name": "driver_name",
+ "type": ["null", "string"]
+ },
+ {
+ "name": "list",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "map",
+ "type": {
+ "type": "map",
+ "values": "string"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
index 26f431a..b7f34ab 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
@@ -51,11 +51,18 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
+ // hfile format.
final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");
HoodieFileWriter<IndexedRecord> hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(hfileWriter instanceof HoodieHFileWriter);
+ // orc file format.
+ final Path orcPath = new Path(basePath + "/partition/path/f1_1-0-1_000.orc");
+ HoodieFileWriter<IndexedRecord> orcFileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
+ orcPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
+ assertTrue(orcFileWriter instanceof HoodieOrcWriter);
+
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 6ec0d95..a41e73e 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -119,6 +119,14 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- ORC -->
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>${orc.version}</version>
+ <classifier>nohive</classifier>
+ </dependency>
+
<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index 552c38f..f7fdcd0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -24,7 +24,8 @@ package org.apache.hudi.common.model;
public enum HoodieFileFormat {
PARQUET(".parquet"),
HOODIE_LOG(".log"),
- HFILE(".hfile");
+ HFILE(".hfile"),
+ ORC(".orc");
private final String extension;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
new file mode 100644
index 0000000..0f1f49f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.Base64;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import java.nio.charset.StandardCharsets;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.util.Utf8;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Methods including addToVector, addUnionValue, createOrcSchema are originally from
+ * https://github.com/streamsets/datacollector.
+ * Source classes:
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter
+ *
+ * Changes made:
+ * 1. Flatten nullable Avro schema type when the value is not null in `addToVector`.
+ * 2. Use getLogicalType(), constants from LogicalTypes instead of getJsonProp() to handle Avro logical types.
+ */
+public class AvroOrcUtils {
+
+ private static final int MICROS_PER_MILLI = 1000;
+ private static final int NANOS_PER_MICRO = 1000;
+
+ /**
+ * Add an object (of a given ORC type) to the column vector at a given position.
+ *
+ * @param type ORC schema of the value Object.
+ * @param colVector The column vector to store the value Object.
+ * @param avroSchema Avro schema of the value Object.
+ * Only used to check logical types for timestamp unit conversion.
+ * @param value Object to be added to the column vector
+ * @param vectorPos The position in the vector where value will be stored at.
+ */
+ public static void addToVector(TypeDescription type, ColumnVector colVector, Schema avroSchema, Object value, int vectorPos) {
+
+ final int currentVecLength = colVector.isNull.length;
+ if (vectorPos >= currentVecLength) {
+ colVector.ensureSize(2 * currentVecLength, true);
+ }
+ if (value == null) {
+ colVector.isNull[vectorPos] = true;
+ colVector.noNulls = false;
+ return;
+ }
+
+ if (avroSchema.getType().equals(Schema.Type.UNION)) {
+ avroSchema = getActualSchemaType(avroSchema);
+ }
+
+ LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() : null;
+
+ switch (type.getCategory()) {
+ case BOOLEAN:
+ LongColumnVector boolVec = (LongColumnVector) colVector;
+ boolVec.vector[vectorPos] = (boolean) value ? 1 : 0;
+ break;
+ case BYTE:
+ LongColumnVector byteColVec = (LongColumnVector) colVector;
+ byteColVec.vector[vectorPos] = (byte) value;
+ break;
+ case SHORT:
+ LongColumnVector shortColVec = (LongColumnVector) colVector;
+ shortColVec.vector[vectorPos] = (short) value;
+ break;
+ case INT:
+ // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but we will ignore that fact here
+ // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the int value
+ LongColumnVector intColVec = (LongColumnVector) colVector;
+ intColVec.vector[vectorPos] = (int) value;
+ break;
+ case LONG:
+ // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but we will ignore that fact here
+ // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the long value
+ LongColumnVector longColVec = (LongColumnVector) colVector;
+ longColVec.vector[vectorPos] = (long) value;
+ break;
+ case FLOAT:
+ DoubleColumnVector floatColVec = (DoubleColumnVector) colVector;
+ floatColVec.vector[vectorPos] = (float) value;
+ break;
+ case DOUBLE:
+ DoubleColumnVector doubleColVec = (DoubleColumnVector) colVector;
+ doubleColVec.vector[vectorPos] = (double) value;
+ break;
+ case VARCHAR:
+ case CHAR:
+ case STRING:
+ BytesColumnVector bytesColVec = (BytesColumnVector) colVector;
+ byte[] bytes = null;
+
+ if (value instanceof String) {
+ bytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+ } else if (value instanceof Utf8) {
+ final Utf8 utf8 = (Utf8) value;
+ bytes = utf8.getBytes();
+ } else if (value instanceof GenericData.EnumSymbol) {
+ bytes = ((GenericData.EnumSymbol) value).toString().getBytes(StandardCharsets.UTF_8);
+ } else {
+ throw new IllegalStateException(String.format(
+ "Unrecognized type for Avro %s field value, which has type %s, value %s",
+ type.getCategory().getName(),
+ value.getClass().getName(),
+ value.toString()
+ ));
+ }
+
+ if (bytes == null) {
+ bytesColVec.isNull[vectorPos] = true;
+ bytesColVec.noNulls = false;
+ } else {
+ bytesColVec.setRef(vectorPos, bytes, 0, bytes.length);
+ }
+ break;
+ case DATE:
+ LongColumnVector dateColVec = (LongColumnVector) colVector;
+ int daysSinceEpoch;
+ if (logicalType instanceof LogicalTypes.Date) {
+ daysSinceEpoch = (int) value;
+ } else if (value instanceof java.sql.Date) {
+ daysSinceEpoch = DateWritable.dateToDays((java.sql.Date) value);
+ } else if (value instanceof Date) {
+ daysSinceEpoch = DateWritable.millisToDays(((Date) value).getTime());
+ } else {
+ throw new IllegalStateException(String.format(
+ "Unrecognized type for Avro DATE field value, which has type %s, value %s",
+ value.getClass().getName(),
+ value.toString()
+ ));
+ }
+ dateColVec.vector[vectorPos] = daysSinceEpoch;
+ break;
+ case TIMESTAMP:
+ TimestampColumnVector tsColVec = (TimestampColumnVector) colVector;
+
+ long time;
+ int nanos = 0;
+
+ // The unit for Timestamp in ORC is millis, convert timestamp to millis if needed
+ if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ time = (long) value;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ final long logicalTsValue = (long) value;
+ time = logicalTsValue / MICROS_PER_MILLI;
+ nanos = NANOS_PER_MICRO * ((int) (logicalTsValue % MICROS_PER_MILLI));
+ } else if (value instanceof Timestamp) {
+ Timestamp tsValue = (Timestamp) value;
+ time = tsValue.getTime();
+ nanos = tsValue.getNanos();
+ } else if (value instanceof java.sql.Date) {
+ java.sql.Date sqlDateValue = (java.sql.Date) value;
+ time = sqlDateValue.getTime();
+ } else if (value instanceof Date) {
+ Date dateValue = (Date) value;
+ time = dateValue.getTime();
+ } else {
+ throw new IllegalStateException(String.format(
+ "Unrecognized type for Avro TIMESTAMP field value, which has type %s, value %s",
+ value.getClass().getName(),
+ value.toString()
+ ));
+ }
+
+ tsColVec.time[vectorPos] = time;
+ tsColVec.nanos[vectorPos] = nanos;
+ break;
+ case BINARY:
+ BytesColumnVector binaryColVec = (BytesColumnVector) colVector;
+
+ byte[] binaryBytes;
+ if (value instanceof GenericData.Fixed) {
+ binaryBytes = ((GenericData.Fixed)value).bytes();
+ } else if (value instanceof ByteBuffer) {
+ final ByteBuffer byteBuffer = (ByteBuffer) value;
+ binaryBytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(binaryBytes);
+ } else if (value instanceof byte[]) {
+ binaryBytes = (byte[]) value;
+ } else {
+ throw new IllegalStateException(String.format(
+ "Unrecognized type for Avro BINARY field value, which has type %s, value %s",
+ value.getClass().getName(),
+ value.toString()
+ ));
+ }
+ binaryColVec.setRef(vectorPos, binaryBytes, 0, binaryBytes.length);
+ break;
+ case DECIMAL:
+ DecimalColumnVector decimalColVec = (DecimalColumnVector) colVector;
+ HiveDecimal decimalValue;
+ if (value instanceof BigDecimal) {
+ final BigDecimal decimal = (BigDecimal) value;
+ decimalValue = HiveDecimal.create(decimal);
+ } else if (value instanceof ByteBuffer) {
+ final ByteBuffer byteBuffer = (ByteBuffer) value;
+ final byte[] decimalBytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(decimalBytes);
+ final BigInteger bigInt = new BigInteger(decimalBytes);
+ final int scale = type.getScale();
+ BigDecimal bigDecVal = new BigDecimal(bigInt, scale);
+
+ decimalValue = HiveDecimal.create(bigDecVal);
+ if (decimalValue == null && decimalBytes.length > 0) {
+ throw new IllegalStateException(
+ "Unexpected read null HiveDecimal from bytes (base-64 encoded): "
+ + Base64.getEncoder().encodeToString(decimalBytes)
+ );
+ }
+ } else if (value instanceof GenericData.Fixed) {
+ final BigDecimal decimal = new Conversions.DecimalConversion()
+ .fromFixed((GenericData.Fixed) value, avroSchema, logicalType);
+ decimalValue = HiveDecimal.create(decimal);
+ } else {
+ throw new IllegalStateException(String.format(
+ "Unexpected type for decimal (%s), cannot convert from Avro value",
+ value.getClass().getCanonicalName()
+ ));
+ }
+ if (decimalValue == null) {
+ decimalColVec.isNull[vectorPos] = true;
+ decimalColVec.noNulls = false;
+ } else {
+ decimalColVec.set(vectorPos, decimalValue);
+ }
+ break;
+ case LIST:
+ List<?> list = (List<?>) value;
+ ListColumnVector listColVec = (ListColumnVector) colVector;
+ listColVec.offsets[vectorPos] = listColVec.childCount;
+ listColVec.lengths[vectorPos] = list.size();
+
+ TypeDescription listType = type.getChildren().get(0);
+ for (Object listItem : list) {
+ addToVector(listType, listColVec.child, avroSchema.getElementType(), listItem, listColVec.childCount++);
+ }
+ break;
+ case MAP:
+ Map<String, ?> mapValue = (Map<String, ?>) value;
+
+ MapColumnVector mapColumnVector = (MapColumnVector) colVector;
+ mapColumnVector.offsets[vectorPos] = mapColumnVector.childCount;
+ mapColumnVector.lengths[vectorPos] = mapValue.size();
+
+ // keys are always strings
+ Schema keySchema = Schema.create(Schema.Type.STRING);
+ for (Map.Entry<String, ?> entry : mapValue.entrySet()) {
+ addToVector(
+ type.getChildren().get(0),
+ mapColumnVector.keys,
+ keySchema,
+ entry.getKey(),
+ mapColumnVector.childCount
+ );
+
+ addToVector(
+ type.getChildren().get(1),
+ mapColumnVector.values,
+ avroSchema.getValueType(),
+ entry.getValue(),
+ mapColumnVector.childCount
+ );
+
+ mapColumnVector.childCount++;
+ }
+
+ break;
+ case STRUCT:
+ StructColumnVector structColVec = (StructColumnVector) colVector;
+
+ GenericData.Record record = (GenericData.Record) value;
+
+ for (int i = 0; i < type.getFieldNames().size(); i++) {
+ String fieldName = type.getFieldNames().get(i);
+ Object fieldValue = record.get(fieldName);
+ TypeDescription fieldType = type.getChildren().get(i);
+ addToVector(fieldType, structColVec.fields[i], avroSchema.getFields().get(i).schema(), fieldValue, vectorPos);
+ }
+
+ break;
+ case UNION:
+ UnionColumnVector unionColVec = (UnionColumnVector) colVector;
+
+ List<TypeDescription> childTypes = type.getChildren();
+ boolean added = addUnionValue(unionColVec, childTypes, avroSchema, value, vectorPos);
+
+ if (!added) {
+ throw new IllegalStateException(String.format(
+ "Failed to add value %s to union with type %s",
+ value == null ? "null" : value.toString(),
+ type.toString()
+ ));
+ }
+
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid TypeDescription " + type.toString() + ".");
+ }
+ }
+
+ /**
+ * Match value with its ORC type and add to the union vector at a given position.
+ *
+ * @param unionVector The vector to store value.
+ * @param unionChildTypes All possible types for the value Object.
+ * @param avroSchema Avro union schema for the value Object.
+ * @param value Object to be added to the unionVector
+ * @param vectorPos The position in the vector where value will be stored at.
+ * @return succeeded or failed
+ */
+ public static boolean addUnionValue(
+ UnionColumnVector unionVector,
+ List<TypeDescription> unionChildTypes,
+ Schema avroSchema,
+ Object value,
+ int vectorPos
+ ) {
+ int matchIndex = -1;
+ TypeDescription matchType = null;
+ Object matchValue = null;
+
+ for (int t = 0; t < unionChildTypes.size(); t++) {
+ TypeDescription childType = unionChildTypes.get(t);
+ boolean matches = false;
+
+ switch (childType.getCategory()) {
+ case BOOLEAN:
+ matches = value instanceof Boolean;
+ break;
+ case BYTE:
+ matches = value instanceof Byte;
+ break;
+ case SHORT:
+ matches = value instanceof Short;
+ break;
+ case INT:
+ matches = value instanceof Integer;
+ break;
+ case LONG:
+ matches = value instanceof Long;
+ break;
+ case FLOAT:
+ matches = value instanceof Float;
+ break;
+ case DOUBLE:
+ matches = value instanceof Double;
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ if (value instanceof String) {
+ matches = true;
+ matchValue = ((String) value).getBytes(StandardCharsets.UTF_8);
+ } else if (value instanceof Utf8) {
+ matches = true;
+ matchValue = ((Utf8) value).getBytes();
+ }
+ break;
+ case DATE:
+ matches = value instanceof Date;
+ break;
+ case TIMESTAMP:
+ matches = value instanceof Timestamp;
+ break;
+ case BINARY:
+ matches = value instanceof byte[] || value instanceof GenericData.Fixed;
+ break;
+ case DECIMAL:
+ matches = value instanceof BigDecimal;
+ break;
+ case LIST:
+ matches = value instanceof List;
+ break;
+ case MAP:
+ matches = value instanceof Map;
+ break;
+ case STRUCT:
+ throw new UnsupportedOperationException("Cannot handle STRUCT within UNION.");
+ case UNION:
+ List<TypeDescription> children = childType.getChildren();
+ if (value == null) {
+ matches = children == null || children.size() == 0;
+ } else {
+ matches = addUnionValue(unionVector, children, avroSchema, value, vectorPos);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid TypeDescription " + childType.getCategory().toString() + ".");
+ }
+
+ if (matches) {
+ matchIndex = t;
+ matchType = childType;
+ break;
+ }
+ }
+
+ if (value == null && matchValue != null) {
+ value = matchValue;
+ }
+
+ if (matchIndex >= 0) {
+ unionVector.tags[vectorPos] = matchIndex;
+ if (value == null) {
+ unionVector.isNull[vectorPos] = true;
+ unionVector.noNulls = false;
+ } else {
+ addToVector(matchType, unionVector.fields[matchIndex], avroSchema.getTypes().get(matchIndex), value, vectorPos);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Read the Column vector at a given position conforming to a given ORC schema.
+ *
+ * @param type ORC schema of the object to read.
+ * @param colVector The column vector to read.
+ * @param avroSchema Avro schema of the object to read.
+ * Only used to check logical types for timestamp unit conversion.
+ * @param vectorPos The position in the vector where the value to read is stored at.
+ * @return The object being read.
+ */
+ public static Object readFromVector(TypeDescription type, ColumnVector colVector, Schema avroSchema, int vectorPos) {
+
+ if (colVector.isRepeating) {
+ vectorPos = 0;
+ }
+
+ if (colVector.isNull[vectorPos]) {
+ return null;
+ }
+
+ if (avroSchema.getType().equals(Schema.Type.UNION)) {
+ avroSchema = getActualSchemaType(avroSchema);
+ }
+ LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() : null;
+
+ switch (type.getCategory()) {
+ case BOOLEAN:
+ return ((LongColumnVector) colVector).vector[vectorPos] != 0;
+ case BYTE:
+ return (byte) ((LongColumnVector) colVector).vector[vectorPos];
+ case SHORT:
+ return (short) ((LongColumnVector) colVector).vector[vectorPos];
+ case INT:
+ return (int) ((LongColumnVector) colVector).vector[vectorPos];
+ case LONG:
+ return ((LongColumnVector) colVector).vector[vectorPos];
+ case FLOAT:
+ return (float) ((DoubleColumnVector) colVector).vector[vectorPos];
+ case DOUBLE:
+ return ((DoubleColumnVector) colVector).vector[vectorPos];
+ case VARCHAR:
+ case CHAR:
+ int maxLength = type.getMaxLength();
+ String result = ((BytesColumnVector) colVector).toString(vectorPos);
+ if (result.length() <= maxLength) {
+ return result;
+ } else {
+ throw new HoodieIOException("CHAR/VARCHAR has length " + result.length() + " greater than Max Length allowed");
+ }
+ case STRING:
+ String stringType = avroSchema.getProp(GenericData.STRING_PROP);
+ if (stringType == null || !stringType.equals(StringType.String)) {
+ int stringLength = ((BytesColumnVector) colVector).length[vectorPos];
+ int stringOffset = ((BytesColumnVector) colVector).start[vectorPos];
+ byte[] stringBytes = new byte[stringLength];
+ System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], stringOffset, stringBytes, 0, stringLength);
+ return new Utf8(stringBytes);
+ } else {
+ return ((BytesColumnVector) colVector).toString(vectorPos);
+ }
+ case DATE:
+ // convert to daysSinceEpoch for LogicalType.Date
+ return (int) ((LongColumnVector) colVector).vector[vectorPos];
+ case TIMESTAMP:
+ // The unit of time in ORC is millis. Convert (time,nanos) to the desired unit per logicalType
+ long time = ((TimestampColumnVector) colVector).time[vectorPos];
+ int nanos = ((TimestampColumnVector) colVector).nanos[vectorPos];
+ if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ return time;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ return time * MICROS_PER_MILLI + nanos / NANOS_PER_MICRO;
+ } else {
+ return ((TimestampColumnVector) colVector).getTimestampAsLong(vectorPos);
+ }
+ case BINARY:
+ int binaryLength = ((BytesColumnVector) colVector).length[vectorPos];
+ int binaryOffset = ((BytesColumnVector) colVector).start[vectorPos];
+ byte[] binaryBytes = new byte[binaryLength];
+ System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], binaryOffset, binaryBytes, 0, binaryLength);
+ // return a ByteBuffer to be consistent with AvroRecordConverter
+ return ByteBuffer.wrap(binaryBytes);
+ case DECIMAL:
+ // HiveDecimal always ignores trailing zeros, thus modifies the scale implicitly,
+ // therefore, the scale must be enforced here.
+ BigDecimal bigDecimal = ((DecimalColumnVector) colVector).vector[vectorPos]
+ .getHiveDecimal().bigDecimalValue()
+ .setScale(((LogicalTypes.Decimal) logicalType).getScale());
+ Schema.Type baseType = avroSchema.getType();
+ if (baseType.equals(Schema.Type.FIXED)) {
+ return new Conversions.DecimalConversion().toFixed(bigDecimal, avroSchema, logicalType);
+ } else if (baseType.equals(Schema.Type.BYTES)) {
+ return bigDecimal.unscaledValue().toByteArray();
+ } else {
+ throw new HoodieIOException(baseType.getName() + "is not a valid type for LogicalTypes.DECIMAL.");
+ }
+ case LIST:
+ ArrayList<Object> list = new ArrayList<>();
+ ListColumnVector listVector = (ListColumnVector) colVector;
+ int listLength = (int) listVector.lengths[vectorPos];
+ int listOffset = (int) listVector.offsets[vectorPos];
+ list.ensureCapacity(listLength);
+ TypeDescription childType = type.getChildren().get(0);
+ for (int i = 0; i < listLength; i++) {
+ list.add(readFromVector(childType, listVector.child, avroSchema.getElementType(), listOffset + i));
+ }
+ return list;
+ case MAP:
+ Map<String, Object> map = new HashMap<String, Object>();
+ MapColumnVector mapVector = (MapColumnVector) colVector;
+ int mapLength = (int) mapVector.lengths[vectorPos];
+ int mapOffset = (int) mapVector.offsets[vectorPos];
+ // keys are always strings for maps in Avro
+ Schema keySchema = Schema.create(Schema.Type.STRING);
+ for (int i = 0; i < mapLength; i++) {
+ map.put(
+ readFromVector(type.getChildren().get(0), mapVector.keys, keySchema, i + mapOffset).toString(),
+ readFromVector(type.getChildren().get(1), mapVector.values,
+ avroSchema.getValueType(), i + mapOffset));
+ }
+ return map;
+ case STRUCT:
+ StructColumnVector structVector = (StructColumnVector) colVector;
+ List<TypeDescription> children = type.getChildren();
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ for (int i = 0; i < children.size(); i++) {
+ record.put(i, readFromVector(children.get(i), structVector.fields[i],
+ avroSchema.getFields().get(i).schema(), vectorPos));
+ }
+ return record;
+ case UNION:
+ UnionColumnVector unionVector = (UnionColumnVector) colVector;
+ int tag = unionVector.tags[vectorPos];
+ ColumnVector fieldVector = unionVector.fields[tag];
+ return readFromVector(type.getChildren().get(tag), fieldVector, avroSchema.getTypes().get(tag), vectorPos);
+ default:
+ throw new HoodieIOException("Unrecognized TypeDescription " + type.toString());
+ }
+ }
+
+ public static TypeDescription createOrcSchema(Schema avroSchema) {
+
+ LogicalType logicalType = avroSchema.getLogicalType();
+
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ return TypeDescription.createDecimal()
+ .withPrecision(((LogicalTypes.Decimal) logicalType).getPrecision())
+ .withScale(((LogicalTypes.Decimal) logicalType).getScale());
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ // The date logical type represents a date within the calendar, with no reference to a particular time zone
+ // or time of day.
+ //
+ // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1
+ // January 1970 (ISO calendar).
+ return TypeDescription.createDate();
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ // The time-millis logical type represents a time of day, with no reference to a particular calendar, time
+ // zone or date, with a precision of one millisecond.
+ //
+ // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds after
+ // midnight, 00:00:00.000.
+ return TypeDescription.createInt();
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ // The time-micros logical type represents a time of day, with no reference to a particular calendar, time
+ // zone or date, with a precision of one microsecond.
+ //
+ // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds after
+ // midnight, 00:00:00.000000.
+ return TypeDescription.createLong();
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ // The timestamp-millis logical type represents an instant on the global timeline, independent of a
+ // particular time zone or calendar, with a precision of one millisecond.
+ //
+ // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds
+ // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+ return TypeDescription.createTimestamp();
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ // The timestamp-micros logical type represents an instant on the global timeline, independent of a
+ // particular time zone or calendar, with a precision of one microsecond.
+ //
+ // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds
+ // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+ return TypeDescription.createTimestamp();
+ }
+ }
+
+ final Schema.Type type = avroSchema.getType();
+ switch (type) {
+ case NULL:
+ // empty union represents null type
+ final TypeDescription nullUnion = TypeDescription.createUnion();
+ return nullUnion;
+ case LONG:
+ return TypeDescription.createLong();
+ case INT:
+ return TypeDescription.createInt();
+ case BYTES:
+ return TypeDescription.createBinary();
+ case ARRAY:
+ return TypeDescription.createList(createOrcSchema(avroSchema.getElementType()));
+ case RECORD:
+ final TypeDescription recordStruct = TypeDescription.createStruct();
+ for (Schema.Field field : avroSchema.getFields()) {
+ final Schema fieldSchema = field.schema();
+ final TypeDescription fieldType = createOrcSchema(fieldSchema);
+ if (fieldType != null) {
+ recordStruct.addField(field.name(), fieldType);
+ }
+ }
+ return recordStruct;
+ case MAP:
+ return TypeDescription.createMap(
+ // in Avro maps, keys are always strings
+ TypeDescription.createString(),
+ createOrcSchema(avroSchema.getValueType())
+ );
+ case UNION:
+ final List<Schema> nonNullMembers = avroSchema.getTypes().stream().filter(
+ schema -> !Schema.Type.NULL.equals(schema.getType())
+ ).collect(Collectors.toList());
+
+ if (nonNullMembers.isEmpty()) {
+ // no non-null union members; represent as an ORC empty union
+ return TypeDescription.createUnion();
+ } else if (nonNullMembers.size() == 1) {
+ // a single non-null union member
+ // this is how Avro represents "nullable" types; as a union of the NULL type with another
+ // since ORC already supports nullability of all types, just use the child type directly
+ return createOrcSchema(nonNullMembers.get(0));
+ } else {
+ // more than one non-null type; represent as an actual ORC union of them
+ final TypeDescription union = TypeDescription.createUnion();
+ for (final Schema childSchema : nonNullMembers) {
+ union.addUnionChild(createOrcSchema(childSchema));
+ }
+ return union;
+ }
+ case STRING:
+ return TypeDescription.createString();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case ENUM:
+ // represent as String for now
+ return TypeDescription.createString();
+ case FIXED:
+ return TypeDescription.createBinary();
+ default:
+ throw new IllegalStateException(String.format("Unrecognized Avro type: %s", type.getName()));
+ }
+ }
+
+ public static Schema createAvroSchema(TypeDescription orcSchema) {
+ switch (orcSchema.getCategory()) {
+ case BOOLEAN:
+ return Schema.create(Schema.Type.BOOLEAN);
+ case BYTE:
+ // tinyint (8 bit), use int to hold it
+ return Schema.create(Schema.Type.INT);
+ case SHORT:
+ // smallint (16 bit), use int to hold it
+ return Schema.create(Schema.Type.INT);
+ case INT:
+ // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but there is no way to distinguish
+ return Schema.create(Schema.Type.INT);
+ case LONG:
+ // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but there is no way to distinguish
+ return Schema.create(Schema.Type.LONG);
+ case FLOAT:
+ return Schema.create(Schema.Type.FLOAT);
+ case DOUBLE:
+ return Schema.create(Schema.Type.DOUBLE);
+ case VARCHAR:
+ case CHAR:
+ case STRING:
+ return Schema.create(Schema.Type.STRING);
+ case DATE:
+ Schema date = Schema.create(Schema.Type.INT);
+ LogicalTypes.date().addToSchema(date);
+ return date;
+ case TIMESTAMP:
+ // Cannot distinguish between TIMESTAMP_MILLIS and TIMESTAMP_MICROS
+ // Assume TIMESTAMP_MILLIS because Timestamp in ORC is in millis
+ Schema timestamp = Schema.create(Schema.Type.LONG);
+ LogicalTypes.timestampMillis().addToSchema(timestamp);
+ return timestamp;
+ case BINARY:
+ return Schema.create(Schema.Type.BYTES);
+ case DECIMAL:
+ Schema decimal = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(orcSchema.getPrecision(), orcSchema.getScale()).addToSchema(decimal);
+ return decimal;
+ case LIST:
+ return Schema.createArray(createAvroSchema(orcSchema.getChildren().get(0)));
+ case MAP:
+ return Schema.createMap(createAvroSchema(orcSchema.getChildren().get(1)));
+ case STRUCT:
+ List<Field> childFields = new ArrayList<>();
+ for (int i = 0; i < orcSchema.getChildren().size(); i++) {
+ TypeDescription childType = orcSchema.getChildren().get(i);
+ String childName = orcSchema.getFieldNames().get(i);
+ childFields.add(new Field(childName, createAvroSchema(childType), "", null));
+ }
+ return Schema.createRecord(childFields);
+ case UNION:
+ return Schema.createUnion(orcSchema.getChildren().stream()
+ .map(AvroOrcUtils::createAvroSchema)
+ .collect(Collectors.toList()));
+ default:
+ throw new IllegalStateException(String.format("Unrecognized ORC type: %s", orcSchema.getCategory().getName()));
+ }
+ }
+
+ /**
+ * Returns the actual schema of a field.
+ *
+ * All types in ORC is nullable whereas Avro uses a union that contains the NULL type to imply
+ * the nullability of an Avro type. To achieve consistency between the Avro and ORC schema,
+ * non-NULL types are extracted from the union type.
+ * @param unionSchema A schema of union type.
+ * @return An Avro schema that is either NULL or a UNION without NULL fields.
+ */
+ private static Schema getActualSchemaType(Schema unionSchema) {
+ final List<Schema> nonNullMembers = unionSchema.getTypes().stream().filter(
+ schema -> !Schema.Type.NULL.equals(schema.getType())
+ ).collect(Collectors.toList());
+ if (nonNullMembers.isEmpty()) {
+ return Schema.create(Schema.Type.NULL);
+ } else if (nonNullMembers.size() == 1) {
+ return nonNullMembers.get(0);
+ } else {
+ return Schema.createUnion(nonNullMembers);
+ }
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index c52d700..9b95e16 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.util;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -25,16 +26,22 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.HoodieException;
public abstract class BaseFileUtils {
public static BaseFileUtils getInstance(String path) {
if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return new ParquetUtils();
+ } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) {
+ return new OrcUtils();
}
throw new UnsupportedOperationException("The format for file " + path + " is not supported yet.");
}
@@ -42,6 +49,8 @@ public abstract class BaseFileUtils {
public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
return new ParquetUtils();
+ } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
+ return new OrcUtils();
}
throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet.");
}
@@ -50,24 +59,122 @@ public abstract class BaseFileUtils {
return getInstance(metaClient.getTableConfig().getBaseFileFormat());
}
- public abstract Set<String> readRowKeys(Configuration configuration, Path filePath);
-
- public abstract Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter);
-
- public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
-
- public abstract Schema readAvroSchema(Configuration configuration, Path filePath);
+ /**
+ * Read the rowKey list from the given data file.
+ * @param filePath The data file path
+ * @param configuration configuration to build fs object
+ * @return Set Set of row keys
+ */
+ public Set<String> readRowKeys(Configuration configuration, Path filePath) {
+ return filterRowKeys(configuration, filePath, new HashSet<>());
+ }
- public abstract BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath);
+ /**
+ * Read the bloom filter from the metadata of the given data file.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ * @return a BloomFilter object
+ */
+ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) {
+ Map<String, String> footerVals =
+ readFooter(configuration, false, filePath,
+ HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
+ HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
+ HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
+ String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+ if (null == footerVal) {
+ // We use old style key "com.uber.hoodie.bloomfilter"
+ footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+ }
+ BloomFilter toReturn = null;
+ if (footerVal != null) {
+ if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
+ toReturn = BloomFilterFactory.fromString(footerVal,
+ footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
+ } else {
+ toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
+ }
+ }
+ return toReturn;
+ }
- public abstract String[] readMinMaxRecordKeys(Configuration configuration, Path filePath);
+ /**
+ * Read the min and max record key from the metadata of the given data file.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ * @return A array of two string where the first is min record key and the second is max record key
+ */
+ public String[] readMinMaxRecordKeys(Configuration configuration, Path filePath) {
+ Map<String, String> minMaxKeys = readFooter(configuration, true, filePath,
+ HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
+ if (minMaxKeys.size() != 2) {
+ throw new HoodieException(
+ String.format("Could not read min/max record key out of footer correctly from %s. read) : %s",
+ filePath, minMaxKeys));
+ }
+ return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
+ minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+ }
+ /**
+ * Read the data file
+ * NOTE: This literally reads the entire file contents, thus should be used with caution.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ * @return A list of GenericRecord
+ */
public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath);
+ /**
+ * Read the data file using the given schema
+ * NOTE: This literally reads the entire file contents, thus should be used with caution.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ * @return A list of GenericRecord
+ */
public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema schema);
- public abstract Map<String, String> readFooter(Configuration conf, boolean required, Path orcFilePath,
- String... footerNames);
+ /**
+ * Read the footer data of the given data file.
+ * @param configuration Configuration
+ * @param required require the footer data to be in data file
+ * @param filePath The data file path
+ * @param footerNames The footer names to read
+ * @return A map where the key is the footer name and the value is the footer value
+ */
+ public abstract Map<String, String> readFooter(Configuration configuration, boolean required, Path filePath,
+ String... footerNames);
+
+ /**
+ * Returns the number of records in the data file.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ */
+ public abstract long getRowCount(Configuration configuration, Path filePath);
+
+ /**
+ * Read the rowKey list matching the given filter, from the given data file.
+ * If the filter is empty, then this will return all the row keys.
+ * @param filePath The data file path
+ * @param configuration configuration to build fs object
+ * @param filter record keys filter
+ * @return Set Set of row keys matching candidateRecordKeys
+ */
+ public abstract Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter);
+
+ /**
+ * Fetch {@link HoodieKey}s from the given data file.
+ * @param configuration configuration to build fs object
+ * @param filePath The data file path
+ * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+ */
+ public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
- public abstract long getRowCount(Configuration conf, Path filePath);
-}
\ No newline at end of file
+ /**
+ * Read the Avro schema of the data file.
+ * @param configuration Configuration
+ * @param filePath The data file path
+ * @return The Avro schema of the data file
+ */
+ public abstract Schema readAvroSchema(Configuration configuration, Path filePath);
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java
new file mode 100644
index 0000000..4b3caa7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This class wraps a ORC reader and provides an iterator based api to read from an ORC file.
+ */
+public class OrcReaderIterator<T> implements Iterator<T> {
+
+ private final RecordReader recordReader;
+ private final Schema avroSchema;
+ List<String> fieldNames;
+ List<TypeDescription> orcFieldTypes;
+ Schema[] avroFieldSchemas;
+ private VectorizedRowBatch batch;
+ private int rowInBatch;
+ private T next;
+
+ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescription orcSchema) {
+ this.recordReader = recordReader;
+ this.avroSchema = schema;
+ this.fieldNames = orcSchema.getFieldNames();
+ this.orcFieldTypes = orcSchema.getChildren();
+ this.avroFieldSchemas = fieldNames.stream()
+ .map(fieldName -> avroSchema.getField(fieldName).schema())
+ .toArray(size -> new Schema[size]);
+ this.batch = orcSchema.createRowBatch();
+ this.rowInBatch = 0;
+ }
+
+ /**
+ * If the current batch is empty, get a new one.
+ * @return true if we have rows available.
+ * @throws IOException
+ */
+ private boolean ensureBatch() throws IOException {
+ if (rowInBatch >= batch.size) {
+ rowInBatch = 0;
+ return recordReader.nextBatch(batch);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ ensureBatch();
+ if (this.next == null) {
+ this.next = (T) readRecordFromBatch();
+ }
+ return this.next != null;
+ } catch (IOException io) {
+ throw new HoodieIOException("unable to read next record from ORC file ", io);
+ }
+ }
+
+ @Override
+ public T next() {
+ try {
+ // To handle case when next() is called before hasNext()
+ if (this.next == null) {
+ if (!hasNext()) {
+ throw new HoodieIOException("No more records left to read from ORC file");
+ }
+ }
+ T retVal = this.next;
+ this.next = (T) readRecordFromBatch();
+ return retVal;
+ } catch (IOException io) {
+ throw new HoodieIOException("unable to read next record from ORC file ", io);
+ }
+ }
+
+ private GenericData.Record readRecordFromBatch() throws IOException {
+ // No more records left to read from ORC file
+ if (!ensureBatch()) {
+ return null;
+ }
+
+ GenericData.Record record = new Record(avroSchema);
+ int numFields = orcFieldTypes.size();
+ for (int i = 0; i < numFields; i++) {
+ Object data = AvroOrcUtils.readFromVector(orcFieldTypes.get(i), batch.cols[i], avroFieldSchemas[i], rowInBatch);
+ record.put(fieldNames.get(i), data);
+ }
+ rowInBatch++;
+ return record;
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
new file mode 100644
index 0000000..9fc49a3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.MetadataNotFoundException;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto.UserMetadataItem;
+import org.apache.orc.Reader;
+import org.apache.orc.Reader.Options;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Utility functions for ORC files.
+ */
+public class OrcUtils extends BaseFileUtils {
+
+ /**
+ * Fetch {@link HoodieKey}s from the given ORC file.
+ *
+ * @param filePath The ORC file path.
+ * @param configuration configuration to build fs object
+ * @return {@link List} of {@link HoodieKey}s fetched from the ORC file
+ */
+ @Override
+ public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) {
+ List<HoodieKey> hoodieKeys = new ArrayList<>();
+ try {
+ if (!filePath.getFileSystem(configuration).exists(filePath)) {
+ return new ArrayList<>();
+ }
+
+ Configuration conf = new Configuration(configuration);
+ conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+
+ Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
+ TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema);
+ List<String> fieldNames = orcSchema.getFieldNames();
+ VectorizedRowBatch batch = orcSchema.createRowBatch();
+ RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
+
+ // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields
+ int keyCol = -1;
+ int partitionCol = -1;
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+ keyCol = i;
+ }
+ if (fieldNames.get(i).equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) {
+ partitionCol = i;
+ }
+ }
+ if (keyCol == -1 || partitionCol == -1) {
+ throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath));
+ }
+ while (recordReader.nextBatch(batch)) {
+ BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol];
+ BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol];
+ for (int i = 0; i < batch.size; i++) {
+ String rowKey = rowKeys.toString(i);
+ String partitionPath = partitionPaths.toString(i);
+ hoodieKeys.add(new HoodieKey(rowKey, partitionPath));
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read from ORC file:" + filePath, e);
+ }
+ return hoodieKeys;
+ }
+
+ /**
+ * NOTE: This literally reads the entire file contents, thus should be used with caution.
+ */
+ @Override
+ public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
+ Schema avroSchema;
+ try {
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
+ avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema());
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io);
+ }
+ return readAvroRecords(configuration, filePath, avroSchema);
+ }
+
+ /**
+ * NOTE: This literally reads the entire file contents, thus should be used with caution.
+ */
+ @Override
+ public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) {
+ List<GenericRecord> records = new ArrayList<>();
+ try {
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
+ TypeDescription orcSchema = reader.getSchema();
+ RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema));
+ OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
+ while (iterator.hasNext()) {
+ GenericRecord record = iterator.next();
+ records.add(record);
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io);
+ }
+ return records;
+ }
+
+ /**
+ * Read the rowKey list matching the given filter, from the given ORC file. If the filter is empty, then this will
+ * return all the rowkeys.
+ *
+ * @param conf configuration to build fs object.
+ * @param filePath The ORC file path.
+ * @param filter record keys filter
+ * @return Set Set of row keys matching candidateRecordKeys
+ */
+ @Override
+ public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String> filter)
+ throws HoodieIOException {
+ try {
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+ Set<String> filteredRowKeys = new HashSet<>();
+ TypeDescription schema = reader.getSchema();
+ List<String> fieldNames = schema.getFieldNames();
+ VectorizedRowBatch batch = schema.createRowBatch();
+ RecordReader recordReader = reader.rows(new Options(conf).schema(schema));
+
+ // column index for the RECORD_KEY_METADATA_FIELD field
+ int colIndex = -1;
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+ colIndex = i;
+ break;
+ }
+ }
+ if (colIndex == -1) {
+ throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath));
+ }
+ while (recordReader.nextBatch(batch)) {
+ BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
+ for (int i = 0; i < batch.size; i++) {
+ String rowKey = rowKeys.toString(i);
+ if (filter.isEmpty() || filter.contains(rowKey)) {
+ filteredRowKeys.add(rowKey);
+ }
+ }
+ }
+ return filteredRowKeys;
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io);
+ }
+ }
+
+ @Override
+ public Map<String, String> readFooter(Configuration conf, boolean required,
+ Path orcFilePath, String... footerNames) {
+ try {
+ Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
+ Map<String, String> footerVals = new HashMap<>();
+ List<UserMetadataItem> metadataItemList = reader.getFileTail().getFooter().getMetadataList();
+ Map<String, String> metadata = metadataItemList.stream().collect(Collectors.toMap(
+ UserMetadataItem::getName,
+ metadataItem -> metadataItem.getValue().toStringUtf8()));
+ for (String footerName : footerNames) {
+ if (metadata.containsKey(footerName)) {
+ footerVals.put(footerName, metadata.get(footerName));
+ } else if (required) {
+ throw new MetadataNotFoundException(
+ "Could not find index in ORC footer. Looked for key " + footerName + " in "
+ + orcFilePath);
+ }
+ }
+ return footerVals;
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to read footer for ORC file:" + orcFilePath, io);
+ }
+ }
+
+ @Override
+ public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
+ try {
+ Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
+ TypeDescription orcSchema = reader.getSchema();
+ return AvroOrcUtils.createAvroSchema(orcSchema);
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to get Avro schema for ORC file:" + orcFilePath, io);
+ }
+ }
+
+ @Override
+ public long getRowCount(Configuration conf, Path orcFilePath) {
+ try {
+ Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
+ return reader.getNumberOfRows();
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io);
+ }
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index c7b3a3f..bd44724 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -19,14 +19,9 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
@@ -58,18 +53,6 @@ import java.util.function.Function;
public class ParquetUtils extends BaseFileUtils {
/**
- * Read the rowKey list from the given parquet file.
- *
- * @param filePath The parquet file path.
- * @param configuration configuration to build fs object
- * @return Set Set of row keys
- */
- @Override
- public Set<String> readRowKeys(Configuration configuration, Path filePath) {
- return filterRowKeys(configuration, filePath, new HashSet<>());
- }
-
- /**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
*
@@ -196,47 +179,8 @@ public class ParquetUtils extends BaseFileUtils {
@Override
public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) {
- return new AvroSchemaConverter(configuration).convert(readSchema(configuration, parquetFilePath));
- }
-
- /**
- * Read out the bloom filter from the parquet file meta data.
- */
- @Override
- public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path parquetFilePath) {
- Map<String, String> footerVals =
- readFooter(configuration, false, parquetFilePath,
- HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
- HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
- HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
- String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
- if (null == footerVal) {
- // We use old style key "com.uber.hoodie.bloomfilter"
- footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
- }
- BloomFilter toReturn = null;
- if (footerVal != null) {
- if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
- toReturn = BloomFilterFactory.fromString(footerVal,
- footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
- } else {
- toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
- }
- }
- return toReturn;
- }
-
- @Override
- public String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
- Map<String, String> minMaxKeys = readFooter(configuration, true, parquetFilePath,
- HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
- if (minMaxKeys.size() != 2) {
- throw new HoodieException(
- String.format("Could not read min/max record key out of footer correctly from %s. read) : %s",
- parquetFilePath, minMaxKeys));
- }
- return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
- minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+ MessageType parquetSchema = readSchema(configuration, parquetFilePath);
+ return new AvroSchemaConverter(configuration).convert(parquetSchema);
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index ff559c5..f913df7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import java.io.IOException;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
@@ -40,6 +41,9 @@ public class HoodieFileReaderFactory {
if (HFILE.getFileExtension().equals(extension)) {
return newHFileFileReader(conf, path);
}
+ if (ORC.getFileExtension().equals(extension)) {
+ return newOrcFileReader(conf, path);
+ }
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
@@ -52,4 +56,8 @@ public class HoodieFileReaderFactory {
CacheConfig cacheConfig = new CacheConfig(conf);
return new HoodieHFileReader<>(conf, path, cacheConfig);
}
+
+ private static <R extends IndexedRecord> HoodieFileReader<R> newOrcFileReader(Configuration conf, Path path) {
+ return new HoodieOrcReader<>(conf, path);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
new file mode 100644
index 0000000..319f8d7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.OrcReaderIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.Reader.Options;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReader {
+ private Path path;
+ private Configuration conf;
+ private final BaseFileUtils orcUtils;
+
+ public HoodieOrcReader(Configuration configuration, Path path) {
+ this.conf = configuration;
+ this.path = path;
+ this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC);
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ return orcUtils.readMinMaxRecordKeys(conf, path);
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ return orcUtils.readBloomFilterFromMetadata(conf, path);
+ }
+
+ @Override
+ public Set<String> filterRowKeys(Set candidateRowKeys) {
+ return orcUtils.filterRowKeys(conf, path, candidateRowKeys);
+ }
+
+ @Override
+ public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+ try {
+ Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
+ RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
+ return new OrcReaderIterator(recordReader, schema, orcSchema);
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to create an ORC reader.", io);
+ }
+ }
+
+ @Override
+ public Schema getSchema() {
+ return orcUtils.readAvroSchema(conf, path);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public long getTotalRecords() {
+ return orcUtils.getRowCount(conf, path);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java
new file mode 100644
index 0000000..b775a37
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.orc.TypeDescription;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestAvroOrcUtils extends HoodieCommonTestHarness {
+
+ public static List<Arguments> testCreateOrcSchemaArgs() {
+ // the ORC schema is constructed in the order as AVRO_SCHEMA:
+ // TRIP_SCHEMA_PREFIX, EXTRA_TYPE_SCHEMA, MAP_TYPE_SCHEMA, FARE_NESTED_SCHEMA, TIP_NESTED_SCHEMA, TRIP_SCHEMA_SUFFIX
+ // The following types are tested:
+ // DATE, DECIMAL, LONG, INT, BYTES, ARRAY, RECORD, MAP, STRING, FLOAT, DOUBLE
+ TypeDescription orcSchema = TypeDescription.fromString("struct<"
+ + "timestamp:bigint,_row_key:string,rider:string,driver:string,begin_lat:double,"
+ + "begin_lon:double,end_lat:double,end_lon:double,"
+ + "distance_in_meters:int,seconds_since_epoch:bigint,weight:float,nation:binary,"
+ + "current_date:date,current_ts:bigint,height:decimal(10,6),"
+ + "city_to_state:map<string,string>,"
+ + "fare:struct<amount:double,currency:string>,"
+ + "tip_history:array<struct<amount:double,currency:string>>,"
+ + "_hoodie_is_deleted:boolean>");
+
+ // Tests the types FIXED, UNION
+ String structField = "{\"type\":\"record\", \"name\":\"fare\",\"fields\": "
+ + "[{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}";
+ Schema avroSchemaWithMoreTypes = new Schema.Parser().parse(
+ "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ + "{\"name\" : \"age\", \"type\":{\"type\": \"fixed\", \"size\": 16, \"name\": \"fixedField\" }},"
+ + "{\"name\" : \"height\", \"type\": [\"int\", \"null\"] },"
+ + "{\"name\" : \"id\", \"type\": [\"int\", \"string\"] },"
+ + "{\"name\" : \"fare\", \"type\": [" + structField + ", \"null\"] }]}");
+ TypeDescription orcSchemaWithMoreTypes = TypeDescription.fromString(
+ "struct<age:binary,height:int,id:uniontype<int,string>,fare:struct<amount:double,currency:string>>");
+
+ return Arrays.asList(
+ Arguments.of(AVRO_SCHEMA, orcSchema),
+ Arguments.of(avroSchemaWithMoreTypes, orcSchemaWithMoreTypes)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testCreateOrcSchemaArgs")
+ public void testCreateOrcSchema(Schema avroSchema, TypeDescription orcSchema) {
+ TypeDescription convertedSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+ assertEquals(orcSchema, convertedSchema);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java
new file mode 100644
index 0000000..b55995c
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestOrcReaderIterator {
+ private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc");
+
+ @BeforeEach
+ @AfterEach
+ public void clearTempFile() {
+ File file = new File(filePath.toString());
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testOrcIteratorReadData() throws Exception {
+ final Configuration conf = new Configuration();
+ Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class, "/simple-test.avsc");
+ TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+ OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB);
+ Writer writer = OrcFile.createWriter(filePath, options);
+ VectorizedRowBatch batch = orcSchema.createRowBatch();
+ BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
+ LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
+ BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
+ for (int r = 0; r < 5; ++r) {
+ int row = batch.size++;
+ byte[] name = ("name" + r).getBytes(StandardCharsets.UTF_8);
+ nameColumns.setVal(row, name);
+ byte[] color = ("color" + r).getBytes(StandardCharsets.UTF_8);
+ colorColumns.setVal(row, color);
+ numberColumns.vector[row] = r;
+ }
+ writer.addRowBatch(batch);
+ writer.close();
+
+ Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
+ RecordReader recordReader = reader.rows(new Reader.Options(conf).schema(orcSchema));
+ Iterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
+ int recordCount = 0;
+ while (iterator.hasNext()) {
+ GenericRecord record = iterator.next();
+ assertEquals("name" + recordCount, record.get("name").toString());
+ assertEquals("color" + recordCount, record.get("favorite_color").toString());
+ assertEquals(recordCount, record.get("favorite_number"));
+ recordCount++;
+ }
+ assertEquals(5, recordCount);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
index 13971d5..ec334bd 100644
--- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
+++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
@@ -44,11 +44,16 @@ public class TestHoodieFileReaderFactory {
HoodieFileReader<IndexedRecord> parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath);
assertTrue(parquetReader instanceof HoodieParquetReader);
- // other file format exception.
+ // log file format.
final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
HoodieFileReader<IndexedRecord> logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath);
}, "should fail since log storage reader is not supported yet.");
assertTrue(thrown.getMessage().contains("format not supported yet."));
+
+ // Orc file format.
+ final Path orcPath = new Path("/partition/path/f1_1-0-1_000.orc");
+ HoodieFileReader<IndexedRecord> orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath);
+ assertTrue(orcReader instanceof HoodieOrcReader);
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index e49d012..b39ee34 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,6 +18,9 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -123,6 +126,8 @@ public class HoodieInputFormatUtils {
} else {
return HoodieHFileInputFormat.class.getName();
}
+ case ORC:
+ return OrcInputFormat.class.getName();
default:
throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
}
@@ -134,6 +139,8 @@ public class HoodieInputFormatUtils {
return MapredParquetOutputFormat.class.getName();
case HFILE:
return MapredParquetOutputFormat.class.getName();
+ case ORC:
+ return OrcOutputFormat.class.getName();
default:
throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat);
}
@@ -145,6 +152,8 @@ public class HoodieInputFormatUtils {
return ParquetHiveSerDe.class.getName();
case HFILE:
return ParquetHiveSerDe.class.getName();
+ case ORC:
+ return OrcSerde.class.getName();
default:
throw new HoodieIOException("No SerDe for base file format " + baseFileFormat);
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0b8234d..32bd9a4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
@@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
@@ -186,6 +187,10 @@ class DefaultSource extends RelationProvider
extraReadPaths: Seq[String],
metaClient: HoodieTableMetaClient): BaseRelation = {
log.info("Loading Base File Only View with options :" + optParams)
+ val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
+ case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+ case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+ }
if (useHoodieFileIndex) {
@@ -198,7 +203,7 @@ class DefaultSource extends RelationProvider
fileIndex.partitionSchema,
fileIndex.dataSchema,
bucketSpec = None,
- fileFormat = new ParquetFileFormat,
+ fileFormat = tableFileFormat,
optParams)(sqlContext.sparkSession)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
@@ -208,12 +213,12 @@ class DefaultSource extends RelationProvider
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
- // simply return as a regular parquet relation
+ // simply return as a regular relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
paths = extraReadPaths,
userSpecifiedSchema = Option(schema),
- className = "parquet",
+ className = formatClassName,
options = optParams)
.resolveRelation()
}
diff --git a/pom.xml b/pom.xml
index a8054c2..c1b4a99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@
<hive.version>2.3.1</hive.version>
<hive.exec.classifier>core</hive.exec.classifier>
<metrics.version>4.1.1</metrics.version>
+ <orc.version>1.6.0</orc.version>
+ <airlift.version>0.16</airlift.version>
<prometheus.version>0.8.0</prometheus.version>
<http.version>4.4.1</http.version>
<spark.version>${spark2.version}</spark.version>