You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2022/10/17 23:09:11 UTC
[hudi] branch release-feature-rfc46 updated: [MINOR] Additional fixes for #6745 (#6947)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
new 0036519120 [MINOR] Additional fixes for #6745 (#6947)
0036519120 is described below
commit 00365191206955b868b2675f2345b6f650b291a0
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Mon Oct 17 16:09:02 2022 -0700
[MINOR] Additional fixes for #6745 (#6947)
* Tidying up
* Tidying up more
* Cleaning up duplication
* Tidying up
* Revisited legacy operating mode configuration
* Tidying up
* Cleaned up `projectUnsafe` API
* Fixing compilation
* Cleaning up `HoodieSparkRecord` ctors;
Revisited mandatory unsafe-projection
* Fixing compilation
* Cleaned up `ParquetReader` initialization
* Revisited `HoodieSparkRecord` to accept either `UnsafeRow` or `HoodieInternalRow`, and avoid unnecessary copying after unsafe-projection
* Cleaning up redundant exception spec
* Make sure `updateMetadataFields` properly wraps `InternalRow` into `HoodieInternalRow` if necessary;
Cleaned up `MetadataValues`
* Fixed meta-fields extraction and `HoodieInternalRow` composition w/in `HoodieSparkRecord`
* De-duplicate `HoodieSparkRecord` ctors;
Make sure either only `UnsafeRow` or `HoodieInternalRow` are permitted inside `HoodieSparkRecord`
* Removed unnecessary copying
* Cleaned up projection for `HoodieSparkRecord` (dropping partition columns);
Removed unnecessary copying
* Fixing compilation
* Fixing compilation (for Flink)
* Cleaned up File Raders' interfaces:
- Extracted `HoodieSeekingFileReader` interface (for key-ranged reads)
- Pushed down concrete implementation methods into `HoodieAvroFileReaderBase` from the interfaces
* Cleaned up File Readers impls (inline with then new interfaces)
* Rebsaed `HoodieBackedTableMetadata` onto new `HoodieSeekingFileReader`
* Tidying up
* Missing licenses
* Re-instate custom override for `HoodieAvroParquetReader`;
Tidying up
* Fixed missing cloning w/in `HoodieLazyInsertIterable`
* Fixed missing cloning in deduplication flow
* Allow `HoodieSparkRecord` to hold `ColumnarBatchRow`
* Missing licenses
* Fixing compilation
* Missing changes
* Fixed Spark 2.x validation whether the row was read as a batch
---
.../hudi/execution/HoodieLazyInsertIterable.java | 28 ++--
.../org/apache/hudi/io/HoodieCreateHandle.java | 3 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 7 +-
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 28 ++--
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 11 ++
.../hudi/table/action/commit/BaseMergeHelper.java | 14 +-
.../hudi/table/action/commit/BaseWriteHelper.java | 14 +-
.../table/action/commit/HoodieWriteHelper.java | 11 +-
.../io/storage/TestHoodieHFileReaderWriter.java | 4 +-
.../hudi/execution/FlinkLazyInsertIterable.java | 2 +-
.../FlinkMergeAndReplaceHandleWithChangeLog.java | 15 +--
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 15 +--
.../hudi/table/action/commit/FlinkWriteHelper.java | 9 +-
.../hudi/execution/JavaLazyInsertIterable.java | 2 +-
.../hudi/table/action/commit/JavaWriteHelper.java | 9 +-
.../hudi/commmon/model/HoodieSparkRecord.java | 142 +++++++++++++--------
.../hudi/execution/SparkLazyInsertIterable.java | 2 +-
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 17 +--
.../hudi/io/storage/HoodieSparkFileReader.java | 36 +-----
.../hudi/io/storage/HoodieSparkParquetReader.java | 35 +++--
.../bootstrap/ParquetBootstrapMetadataHandler.java | 3 +-
.../org/apache/hudi/HoodieInternalRowUtils.scala | 9 --
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 5 +
.../TestBoundedInMemoryExecutorInSpark.java | 8 +-
.../hudi/execution/TestBoundedInMemoryQueue.java | 16 +--
.../hudi/common/model/HoodieAvroIndexedRecord.java | 3 +-
.../apache/hudi/common/model/HoodieAvroRecord.java | 3 +-
.../hudi/common/model/HoodieAvroRecordMerger.java | 60 ++++++---
.../hudi/common/model/HoodieEmptyRecord.java | 3 +-
.../org/apache/hudi/common/model/HoodieRecord.java | 2 +-
.../hudi/common/model/HoodieRecordMerger.java | 7 +-
.../apache/hudi/common/model/MetadataValues.java | 28 ++--
.../table/log/AbstractHoodieLogRecordReader.java | 16 +--
.../table/log/HoodieMergedLogRecordScanner.java | 3 +-
.../common/table/log/block/HoodieDataBlock.java | 2 +-
.../table/log/block/HoodieHFileDataBlock.java | 16 +--
.../hudi/common/util/ParquetReaderIterator.java | 4 +-
.../apache/hudi/common/util/VisibleForTesting.java | 25 ++++
.../hudi/io/storage/HoodieAvroFileReader.java | 71 +----------
.../hudi/io/storage/HoodieAvroFileReaderBase.java | 48 +++++++
.../hudi/io/storage/HoodieAvroHFileReader.java | 57 ++++++---
.../hudi/io/storage/HoodieAvroOrcReader.java | 26 ++--
.../hudi/io/storage/HoodieAvroParquetReader.java | 22 +++-
.../apache/hudi/io/storage/HoodieFileReader.java | 46 +++----
...ileReader.java => HoodieSeekingFileReader.java} | 23 +---
.../hudi/metadata/HoodieBackedTableMetadata.java | 61 ++++-----
.../hudi/sink/clustering/ClusteringOperator.java | 8 +-
.../table/format/mor/MergeOnReadInputFormat.java | 3 +-
.../hudi/hadoop/HoodieHFileRecordReader.java | 7 +-
.../reader/DFSHoodieDatasetInputReader.java | 3 +-
.../org/apache/hudi/HoodieSparkRecordMerger.java | 9 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 40 +++---
.../scala/org/apache/hudi/LogFileIterator.scala | 42 +++---
.../SparkFullBootstrapDataProviderBase.java | 9 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 6 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 7 +
.../apache/spark/sql/adapter/Spark3_1Adapter.scala | 3 +
.../spark/sql/vectorized/ColumnarUtils.scala | 32 +++++
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 3 +
.../spark/sql/vectorized/ColumnarUtils.scala | 32 +++++
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 3 +
62 files changed, 635 insertions(+), 547 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index b0831f0bc9..20f75f63c5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -21,9 +21,9 @@ package org.apache.hudi.execution;
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
@@ -90,18 +90,24 @@ public abstract class HoodieLazyInsertIterable<T>
}
}
- /**
- * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
- * expensive operations of transformation to the reader thread.
- */
- static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
- Schema schema, HoodieWriteConfig config) {
- return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
+ static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
+ HoodieWriteConfig config) {
+ return getCloningTransformerInternal(schema, config.getProps());
}
- static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
- Schema schema) {
- return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps());
+ static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
+ return getCloningTransformerInternal(schema, new TypedProperties());
+ }
+
+ private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema,
+ TypedProperties props) {
+ return record -> {
+ // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+ // it since these records will be subsequently buffered (w/in the in-memory queue)
+ HoodieRecord<T> clonedRecord = record.copy();
+ return new HoodieInsertValueGenResult(clonedRecord, schema, props);
+ };
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index d3782e9f20..6f6d96394f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -143,8 +143,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
} else {
rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
}
- MetadataValues metadataValues = new MetadataValues();
- metadataValues.setFileName(path.getName());
+ MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 28377918b4..276b318890 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -388,12 +388,11 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
} else {
rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
}
- MetadataValues metadataValues = new MetadataValues();
- metadataValues.setFileName(newFilePath.getName());
+ // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
+ // file holding this record even in cases when overall metadata is preserved
+ MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
if (shouldPreserveRecordMetadata) {
- // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
- // file holding this record even in cases when overall metadata is preserved
fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index fda1435345..e39a60d390 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -18,6 +18,9 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -33,10 +36,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.generic.GenericRecord;
-
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -80,25 +79,16 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combinedRecordOpt, Schema writerSchema)
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
- Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
- HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
- final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
+ Option<HoodieRecord> savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance);
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
- Option<IndexedRecord> avroOpt = savedCombineRecordOp
- .flatMap(r -> {
- try {
- return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
- .map(HoodieAvroIndexedRecord::getData);
- } catch (IOException e) {
- LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
- return Option.empty();
- }
- });
- cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+ Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+ toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+ cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 2066e9c241..3e0691cdf0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -279,4 +281,13 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
+ "file suffix: " + fileSuffix + " error");
}
}
+
+ protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, Schema writerSchema, TypedProperties props) {
+ try {
+ return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + record, e);
+ return Option.empty();
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 4c5db03d35..c1bf6de1fd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -33,8 +32,6 @@ import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.util.Iterator;
@@ -77,16 +74,7 @@ public abstract class BaseMergeHelper<T, I, K, O> {
return new MergingIterator<>(
(Iterator<HoodieRecord>) reader.getRecordIterator(readerSchema),
(Iterator<HoodieRecord>) bootstrapReader.getRecordIterator(bootstrapReadSchema),
- (oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
- }
-
- @Nonnull
- private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) {
- try {
- return left.joinWith(right, targetSchema);
- } catch (IOException e) {
- throw new HoodieIOException("Failed to merge records", e);
- }
+ (oneRecord, otherRecord) -> oneRecord.joinWith(otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 622ed4573e..adef1c4459 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -82,17 +83,16 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
- public I deduplicateRecords(
- I records, HoodieTable<T, I, K, O> table, int parallelism) {
+ public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parallelism) {
HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}
- public I deduplicateRecords(
- I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
- return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger);
+ public I deduplicateRecords(I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
+ TypedProperties updatedProps = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
+ return doDeduplicateRecords(records, index, parallelism, schema, updatedProps, merger);
}
- protected abstract I deduplicateRecordsInternal(
- I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger);
+ protected abstract I doDeduplicateRecords(
+ I records, HoodieIndex<?, ?> index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 1893132e7d..6557f83b24 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -31,7 +32,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
-import java.util.Properties;
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
@@ -53,8 +53,8 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
}
@Override
- public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+ protected HoodieData<HoodieRecord<T>> doDeduplicateRecords(
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -64,7 +64,10 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
- return Pair.of(key, record);
+ // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ // Here we have to make a copy of the incoming record, since it might be holding
+ // an instance of [[InternalRow]] pointing into shared, mutable buffer
+ return Pair.of(key, record.copy());
}).reduceByKey((rec1, rec2) -> {
HoodieRecord<T> reducedRecord;
try {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index 5fb7065a8a..815b04c9db 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -228,14 +228,14 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
- Iterator<IndexedRecord> iterator = hfileReader.getIndexedRecordsByKeysIterator(keys, avroSchema);
+ Iterator<HoodieRecord<IndexedRecord>> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);
List<Integer> expectedIds =
IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
.boxed().collect(Collectors.toList());
int index = 0;
while (iterator.hasNext()) {
- GenericRecord record = (GenericRecord) iterator.next();
+ GenericRecord record = (GenericRecord) iterator.next().getData();
String key = "key" + String.format("%02d", expectedIds.get(index));
assertEquals(key, record.get("_row_key").toString());
assertEquals(Integer.toString(expectedIds.get(index)), record.get("time").toString());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 5ac00bd805..120fdb7deb 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -61,7 +61,7 @@ public class FlinkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr),
- Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig));
+ Option.of(getExplicitInsertHandler()), getCloningTransformer(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 7dde24ec8c..39f55f027f 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -70,21 +70,12 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
- HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
- Option<IndexedRecord> avroOpt = savedCombineRecordOp
- .flatMap(r -> {
- try {
- return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
- .map(HoodieAvroIndexedRecord::getData);
- } catch (IOException e) {
- LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
- return Option.empty();
- }
- });
- cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+ Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+ toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+ cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 4c9a7484c1..ce419f16e2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -68,21 +68,12 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
- HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
- Option<IndexedRecord> avroOpt = savedCombineRecordOp
- .flatMap(r -> {
- try {
- return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
- .map(HoodieAvroIndexedRecord::getData);
- } catch (IOException e) {
- LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
- return Option.empty();
- }
- });
- cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+ Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+ toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+ cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index db9422336d..8855457684 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.action.commit;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -32,15 +34,12 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.avro.Schema;
-
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
import java.util.stream.Collectors;
/**
@@ -91,8 +90,8 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
}
@Override
- public List<HoodieRecord<T>> deduplicateRecordsInternal(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+ protected List<HoodieRecord<T>> doDeduplicateRecords(
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index ddf6345926..1e430cd699 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -64,7 +64,7 @@ public class JavaLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
- new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
+ new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getCloningTransformer(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 14016cb5c0..dc109f8103 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.action.commit;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -29,13 +31,10 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.Schema;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
import java.util.stream.Collectors;
public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
@@ -58,8 +57,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
}
@Override
- public List<HoodieRecord<T>> deduplicateRecordsInternal(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+ protected List<HoodieRecord<T>> doDeduplicateRecords(
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index e111679842..9cdccbe407 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -18,7 +18,9 @@
package org.apache.hudi.commmon.model;
+import org.apache.avro.Schema;
import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -28,18 +30,19 @@ import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import org.apache.hudi.util.HoodieSparkRecordUtils;
-
-import org.apache.avro.Schema;
import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -55,7 +58,19 @@ import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
- * Spark Engine-specific Implementations of `HoodieRecord`.
+ * Spark Engine-specific Implementations of `HoodieRecord`
+ *
+ * NOTE: [[HoodieSparkRecord]] is expected to hold either [[UnsafeRow]] or [[HoodieInternalRow]]:
+ *
+ * <ul>
+ * <li>[[UnsafeRow]] is held to make sure a) we don't deserialize raw bytes payload
+ * into JVM types unnecessarily, b) we don't incur penalty of ser/de during shuffling,
+ * c) we don't add strain on GC</li>
+ * <li>[[HoodieInternalRow]] is held in cases when underlying [[UnsafeRow]]'s metadata fields
+ * need to be updated (ie serving as an overlay layer on top of [[UnsafeRow]])</li>
+ * </ul>
+ *
+
*/
public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
@@ -68,48 +83,38 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
* We should use this construction method when we read internalRow from file.
* The record constructed by this method must be used in iter.
*/
- public HoodieSparkRecord(InternalRow data, StructType schema) {
+ public HoodieSparkRecord(InternalRow data) {
super(null, data);
- this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, false);
+ validateRow(data);
this.copy = false;
}
- public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) {
super(key, data);
- this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
- this.copy = true;
+ validateRow(data);
+ this.copy = copy;
}
- public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) {
+ private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, boolean copy) {
super(key, data, operation);
- this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
- this.copy = true;
- }
+ validateRow(data);
- public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
- super(key, data, operation);
- this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
this.copy = copy;
}
- public HoodieSparkRecord(HoodieSparkRecord record) {
- super(record);
- this.copy = record.copy;
- }
-
@Override
public HoodieSparkRecord newInstance() {
- return new HoodieSparkRecord(this);
+ return new HoodieSparkRecord(this.key, this.data, this.operation, this.copy);
}
@Override
public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
- return new HoodieSparkRecord(key, data, null, op);
+ return new HoodieSparkRecord(key, this.data, op, this.copy);
}
@Override
public HoodieSparkRecord newInstance(HoodieKey key) {
- return new HoodieSparkRecord(key, data, null);
+ return new HoodieSparkRecord(key, this.data, this.operation, this.copy);
}
@Override
@@ -145,50 +150,56 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
}
@Override
- public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
- return new HoodieSparkRecord(getKey(), mergeRow, targetStructType, getOperation(), copy);
+ UnsafeProjection projection =
+ HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType);
+ return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), getOperation(), copy);
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
- UTF8String[] metaFields = extractMetaField(structType, targetStructType);
- if (metaFields.length == 0) {
- throw new UnsupportedOperationException();
- }
- boolean containMetaFields = hasMetaField(structType);
- InternalRow resultRow = new HoodieInternalRow(metaFields, data, containMetaFields);
- return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation(), copy);
+ boolean containMetaFields = hasMetaFields(structType);
+ UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+
+ // TODO add actual rewriting
+ InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields);
+
+ return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
}
@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
- InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
- UTF8String[] metaFields = extractMetaField(structType, newStructType);
- if (metaFields.length > 0) {
- rewriteRow = new HoodieInternalRow(metaFields, data, true);
- }
- return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation(), copy);
+ boolean containMetaFields = hasMetaFields(structType);
+ UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+
+ InternalRow rewrittenRow =
+ HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+ HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields);
+
+ return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
}
@Override
public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(data, structType);
+
metadataValues.getKv().forEach((key, value) -> {
int pos = structType.fieldIndex(key);
if (value != null) {
- data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
+ updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value));
}
});
- return new HoodieSparkRecord(getKey(), data, structType, getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy);
}
@Override
@@ -242,7 +253,9 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String key;
String partition;
- if (keyGen.isPresent() && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
+ boolean populateMetaFields = Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(),
+ POPULATE_META_FIELDS.defaultValue().toString()).toString());
+ if (!populateMetaFields && keyGen.isPresent()) {
SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface) keyGen.get();
key = keyGenerator.getRecordKey(data, structType).toString();
partition = keyGenerator.getPartitionPath(data, structType).toString();
@@ -251,7 +264,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
- return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
+ return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy);
}
@Override
@@ -268,7 +281,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
public HoodieSparkRecord copy() {
if (!copy) {
this.data = this.data.copy();
- copy = true;
+ this.copy = true;
}
return this;
}
@@ -286,20 +299,29 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
}
}
- private UTF8String[] extractMetaField(StructType recordStructType, StructType structTypeWithMetaField) {
- return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
- .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structTypeWithMetaField, f))
- .map(field -> {
- if (HoodieCatalystExpressionUtils$.MODULE$.existField(recordStructType, field)) {
- return data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field));
- } else {
- return UTF8String.EMPTY_UTF8;
- }
- }).toArray(UTF8String[]::new);
+ private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, StructType structType) {
+ if (data instanceof HoodieInternalRow) {
+ return (HoodieInternalRow) data;
+ }
+
+ boolean containsMetaFields = hasMetaFields(structType);
+ UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ return new HoodieInternalRow(metaFields, data, containsMetaFields);
}
- private static boolean hasMetaField(StructType structType) {
- return HoodieCatalystExpressionUtils$.MODULE$.existField(structType, COMMIT_TIME_METADATA_FIELD);
+ private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) {
+ boolean containsMetaFields = hasMetaFields(structType);
+ if (containsMetaFields) {
+ return HoodieRecord.HOODIE_META_COLUMNS.stream()
+ .map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col)))
+ .toArray(UTF8String[]::new);
+ } else {
+ return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+ }
+ }
+
+ private static boolean hasMetaFields(StructType structType) {
+ return structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined();
}
/**
@@ -329,6 +351,14 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
- return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
+ return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, operation, record.copy);
+ }
+
+ private static void validateRow(InternalRow data) {
+ // NOTE: [[HoodieSparkRecord]] is expected to hold either
+ // - Instance of [[UnsafeRow]] or
+ // - Instance of [[HoodieInternalRow]] or
+ // - Instance of [[ColumnarBatchRow]]
+ ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index f1ae56d70b..b6bb893e02 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -85,7 +85,7 @@ public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
}
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
- getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
+ getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 0a042d3362..8673d2f5ba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -18,6 +18,8 @@
package org.apache.hudi.execution.bulkinsert;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.HoodieSparkUtils;
@@ -33,13 +35,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.util.Properties;
@@ -80,7 +78,6 @@ public class RDDSpatialCurveSortPartitioner<T>
schema.toString(),
sparkEngineContext.getSqlContext().sparkSession()
);
-
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty())
@@ -94,19 +91,17 @@ public class RDDSpatialCurveSortPartitioner<T>
});
} else if (recordType == HoodieRecordType.SPARK) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get());
- Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType);
-
+ Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(),
+ sparkEngineContext.getSqlContext().sparkSession(), structType);
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
return sortedDataset.queryExecution().toRdd()
.toJavaRDD()
- .map(row -> {
- InternalRow internalRow = row.copy();
+ .map(internalRow -> {
String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
HoodieKey hoodieKey = new HoodieKey(key, partition);
- HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structType);
- return hoodieRecord;
+ return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, false);
});
} else {
throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
index 86134eda37..553b084e29 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
@@ -18,36 +18,10 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.HoodieInternalRowUtils;
-import org.apache.hudi.commmon.model.HoodieSparkRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.MappingIterator;
-
-import org.apache.avro.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public interface HoodieSparkFileReader extends HoodieFileReader<InternalRow> {
- ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema) throws IOException;
-
- ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
- default ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema) throws IOException {
- ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema);
- StructType structType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType)));
- }
-
- @Override
- default ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
- ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema, requestedSchema);
- StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType)));
- }
-}
+/**
+ * Marker interface for every {@link HoodieFileReader} reading in Catalyst (Spark native tyeps, ie
+ * producing {@link InternalRow}s)
+ */
+public interface HoodieSparkFileReader extends HoodieFileReader<InternalRow> {}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index a46174cbae..b162cbbadc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -22,16 +22,19 @@ import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
@@ -41,6 +44,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
private final Path path;
@@ -70,12 +75,20 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
}
@Override
- public ClosableIterator<InternalRow> getInternalRowIterator(Schema schema) throws IOException {
- return getInternalRowIterator(schema, null);
+ public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema, requestedSchema);
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
+ UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType);
+
+ return new MappingIterator<>(iterator, data -> {
+ // NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert
+ // it to [[UnsafeRow]] holding just raw bytes
+ UnsafeRow unsafeRow = projection.apply(data);
+ return unsafeCast(new HoodieSparkRecord(unsafeRow));
+ });
}
- @Override
- public ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
@@ -85,13 +98,9 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json());
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
- InputFile inputFile = HadoopInputFile.fromPath(path, conf);
- ParquetReader<InternalRow> reader = new ParquetReader.Builder<InternalRow>(inputFile) {
- @Override
- protected ReadSupport getReadSupport() {
- return new ParquetReadSupport();
- }
- }.withConf(conf).build();
+ ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), path)
+ .withConf(conf)
+ .build();
ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 3828f63564..86a984dd83 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -74,8 +74,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
HoodieRecord recordCopy = record.copy();
String recKey = recordCopy.getRecordKey(reader.getSchema(), Option.of(keyGenerator));
HoodieRecord hoodieRecord = recordCopy.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
- MetadataValues metadataValues = new MetadataValues();
- metadataValues.setRecordKey(recKey);
+ MetadataValues metadataValues = new MetadataValues().setRecordKey(recKey);
return hoodieRecord
.updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues)
.newInstance(new HoodieKey(recKey, partitionPath));
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
index 169ccd61c3..b6c331cbeb 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
@@ -219,15 +219,6 @@ object HoodieInternalRowUtils {
schemaMap.get(schema)
}
- def projectUnsafe(row: InternalRow, structType: StructType, copy: Boolean = true): InternalRow = {
- if (row == null || row.isInstanceOf[UnsafeRow] || row.isInstanceOf[HoodieInternalRow]) {
- row
- } else {
- val unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType).apply(row)
- if (copy) unsafeRow.copy() else unsafeRow
- }
- }
-
private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
oldSchema match {
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 6f9616b669..4b692222ed 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -46,6 +46,11 @@ import java.util.Locale
*/
trait SparkAdapter extends Serializable {
+ /**
+ * Checks whether provided instance of [[InternalRow]] is actually an instance of [[ColumnarBatchRow]]
+ */
+ def isColumnarBatchRow(r: InternalRow): Boolean
+
/**
* Returns an instance of [[HoodieCatalogUtils]] providing for common utils operating on Spark's
* [[TableCatalog]]s
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index a714d60d00..2b19904407 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -41,7 +41,7 @@ import java.util.List;
import scala.Tuple2;
-import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -98,7 +98,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
- getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
+ getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
int result = executor.execute();
// It should buffer and write 100 records
assertEquals(100, result);
@@ -145,7 +145,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
- getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
+ getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
Thread.currentThread().interrupt();
@@ -194,7 +194,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
- consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
getPreExecuteRunnable());
executor.shutdownNow();
boolean terminatedGracefully = executor.awaitTermination();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index a4f723ff01..b4fe688256 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -54,7 +54,7 @@ import java.util.stream.IntStream;
import scala.Tuple2;
-import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -85,7 +85,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, numRecords);
final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -125,7 +125,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final List<List<HoodieRecord>> recs = new ArrayList<>();
final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
// Record Key to <Producer Index, Rec Index within a producer>
Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
@@ -220,11 +220,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final int recordLimit = 5;
final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> sizeEstimator = new DefaultSizeEstimator<>();
HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
- getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
+ getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
final long objSize = sizeEstimator.sizeEstimate(genResult);
final long memoryLimitInBytes = recordLimit * objSize;
final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
executorService.submit(() -> {
@@ -269,7 +269,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>();
// queue memory limit
HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
- getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
+ getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
final long memoryLimitInBytes = 4 * objSize;
@@ -277,7 +277,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
// stops and throws
// correct exception back.
BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue1 =
- new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
@@ -305,7 +305,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue2 =
- new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> res = executorService.submit(() -> {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 913bbcb97c..479c8eb9d6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -98,8 +98,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public HoodieRecord joinWith(HoodieRecord other,
- Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
GenericRecord record = HoodieAvroUtils.stitchRecords((GenericRecord) data, (GenericRecord) other.getData(), targetSchema);
return new HoodieAvroIndexedRecord(record);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index dfd2b4ba33..5cbadece6b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -101,8 +101,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public HoodieRecord joinWith(HoodieRecord other,
- Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
throw new UnsupportedOperationException();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index e57a18b592..b9e29787f8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.model;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -34,28 +36,33 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
public class HoodieAvroRecordMerger implements HoodieRecordMerger {
- public static String DE_DUPING = "de_duping";
-
@Override
public String getMergingStrategy() {
return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
- boolean deDuping = Boolean.parseBoolean(props.getOrDefault(DE_DUPING, "false").toString());
- if (deDuping) {
- HoodieRecord res = preCombine(older, newer);
- if (res == older) {
- return Option.of(Pair.of(res, oldSchema));
- } else {
- return Option.of(Pair.of(res, newSchema));
- }
- } else {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
- .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
+ Config.LegacyOperationMode legacyOperatingMode = Config.LegacyOperationMode.valueOf(
+ props.getString(Config.LEGACY_OPERATING_MODE.key(), Config.LEGACY_OPERATING_MODE.defaultValue()));
+
+ switch (legacyOperatingMode) {
+ case PRE_COMBINING:
+ HoodieRecord res = preCombine(older, newer);
+ if (res == older) {
+ return Option.of(Pair.of(res, oldSchema));
+ } else {
+ return Option.of(Pair.of(res, newSchema));
+ }
+
+ case COMBINING:
+ return combineAndGetUpdateValue(older, newer, newSchema, props)
+ .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
+
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported legacy operating mode (%s)", legacyOperatingMode));
}
}
@@ -83,10 +90,25 @@ public class HoodieAvroRecordMerger implements HoodieRecordMerger {
.map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
}
- public static Properties withDeDuping(Properties props) {
- Properties newProps = new Properties();
- newProps.putAll(props);
- newProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
- return newProps;
+ public static class Config {
+
+ public enum LegacyOperationMode {
+ PRE_COMBINING,
+ COMBINING
+ }
+
+ public static ConfigProperty<String> LEGACY_OPERATING_MODE =
+ ConfigProperty.key("hoodie.datasource.write.merger.legacy.operation")
+ .defaultValue(LegacyOperationMode.COMBINING.name())
+ .withDocumentation("Controls the mode of the merging operation performed by `HoodieAvroRecordMerger`. "
+ + "This is required to maintain backward-compatibility w/ the existing semantic of `HoodieRecordPayload` "
+ + "implementations providing `preCombine` and `combineAndGetUpdateValue` methods.");
+
+ public static TypedProperties withLegacyOperatingModePreCombining(Properties props) {
+ TypedProperties newProps = new TypedProperties();
+ newProps.putAll(props);
+ newProps.setProperty(Config.LEGACY_OPERATING_MODE.key(), Config.LegacyOperationMode.PRE_COMBINING.name());
+ return newProps;
+ }
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index 74954a5e63..eca1ad9a3b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -92,8 +92,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public HoodieRecord joinWith(HoodieRecord other,
- Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
throw new UnsupportedOperationException();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 1acefe2204..363092409d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -291,7 +291,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
/**
* Support bootstrap.
*/
- public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
+ public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema);
/**
* Rewrite record into new schema(add meta columns)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 4077752831..da413592ab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -19,16 +19,15 @@
package org.apache.hudi.common.model;
import org.apache.avro.Schema;
-
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Properties;
/**
* HoodieMerge defines how to merge two records. It is a stateless component.
@@ -45,7 +44,7 @@ public interface HoodieRecordMerger extends Serializable {
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to yield the same result)
*/
- Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException;
+ Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException;
/**
* The record type handled by the current merger.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
index a4ff9a1896..baee081633 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
@@ -18,46 +18,44 @@
package org.apache.hudi.common.model;
-import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
-
import java.util.HashMap;
import java.util.Map;
-import org.apache.hudi.common.util.ValidationUtils;
public class MetadataValues {
- private Map<String, String> kv;
-
- public MetadataValues(Map<String, String> kv) {
- ValidationUtils.checkArgument(HOODIE_META_COLUMNS_WITH_OPERATION.containsAll(kv.values()));
- this.kv = kv;
- }
+ private final Map<String, String> kv;
public MetadataValues() {
this.kv = new HashMap<>();
}
- public void setCommitTime(String value) {
+ public MetadataValues setCommitTime(String value) {
this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value);
+ return this;
}
- public void setCommitSeqno(String value) {
+ public MetadataValues setCommitSeqno(String value) {
this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value);
+ return this;
}
- public void setRecordKey(String value) {
+ public MetadataValues setRecordKey(String value) {
this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value);
+ return this;
}
- public void setPartitionPath(String value) {
+ public MetadataValues setPartitionPath(String value) {
this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value);
+ return this;
}
- public void setFileName(String value) {
+ public MetadataValues setFileName(String value) {
this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value);
+ return this;
}
- public void setOperation(String value) {
+ public MetadataValues setOperation(String value) {
this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value);
+ return this;
}
public Map<String, String> getKv() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 8a4901019c..862cea8f79 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -96,7 +97,7 @@ public abstract class AbstractHoodieLogRecordReader {
protected final String preCombineField;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
- private final Properties payloadProps = new Properties();
+ private final TypedProperties payloadProps;
// simple key gen fields
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
@@ -164,10 +165,11 @@ public abstract class AbstractHoodieLogRecordReader {
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
// Log scanner merge log with precombine
- this.payloadProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
+ TypedProperties props = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(new Properties());
if (this.preCombineField != null) {
- this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
+ props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
}
+ this.payloadProps = props;
this.recordMerger = recordMerger;
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
@@ -475,14 +477,12 @@ public abstract class AbstractHoodieLogRecordReader {
}
private ClosableIterator<HoodieRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt, HoodieRecordType type) throws IOException {
- ClosableIterator<HoodieRecord> iter;
if (keySpecOpt.isPresent()) {
KeySpec keySpec = keySpecOpt.get();
- iter = unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type));
+ return unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type));
} else {
- iter = unsafeCast(dataBlock.getRecordIterator(type));
+ return unsafeCast(dataBlock.getRecordIterator(type));
}
- return iter;
}
/**
@@ -524,7 +524,7 @@ public abstract class AbstractHoodieLogRecordReader {
return withOperationField;
}
- protected Properties getPayloadProps() {
+ protected TypedProperties getPayloadProps() {
return payloadProps;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 42294c3d11..7d2e2a654a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -155,7 +155,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
HoodieRecord<T> oldRecord = records.get(key);
T oldValue = oldRecord.getData();
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema,
+ newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedRecord.getData() != oldValue) {
records.put(key, combinedRecord.copy());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 19ecdbf9ee..2e499f4ae9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -179,7 +179,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
}
HashSet<String> keySet = new HashSet<>(keys);
- return FilteringIterator.getInstance(allRecords, keySet, fullKey, record -> getRecordKey(record));
+ return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
}
protected <T> ClosableIterator<HoodieRecord<T>> readRecordsFromBlockPayload(HoodieRecordType type) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index faa7856e43..8f23dce02a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log.block;
-import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -34,19 +33,15 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -58,10 +53,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.TreeMap;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile
@@ -205,10 +201,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
// Get writer's schema from the header
- final ClosableIterator<IndexedRecord> recordIterator =
- fullKey ? reader.getIndexedRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getIndexedRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
+ final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
+ fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
- return new MappingIterator<>(recordIterator, data -> (HoodieRecord<T>) new HoodieAvroIndexedRecord((data)));
+ return new MappingIterator<>(recordIterator, data -> (HoodieRecord<T>) data);
}
private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 42ea766eb9..f681bdfe84 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -32,11 +32,11 @@ import java.io.IOException;
public class ParquetReaderIterator<T> implements ClosableIterator<T> {
// Parquet reader for an existing parquet file
- private final ParquetReader<T> parquetReader;
+ private final ParquetReader<? extends T> parquetReader;
// Holds the next entry returned by the parquet reader
private T next;
- public ParquetReaderIterator(ParquetReader<T> parquetReader) {
+ public ParquetReaderIterator(ParquetReader<? extends T> parquetReader) {
this.parquetReader = parquetReader;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java
new file mode 100644
index 0000000000..c6e4e488f0
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+/**
+ * Annotation designating a field or a method as visible for the testing purposes
+ */
+public @interface VisibleForTesting {
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index c2ddfa319f..a829880d5f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -18,71 +18,10 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.MappingIterator;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord>, AutoCloseable {
-
- ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException;
-
- ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
- default Option<IndexedRecord> getIndexedRecordByKey(String key, Schema readerSchema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- default ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- default ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys) throws IOException {
- return getIndexedRecordsByKeysIterator(keys, getSchema());
- }
-
- default ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
- throw new UnsupportedEncodingException();
- }
-
- default ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
- return getIndexedRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
- }
-
- default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
- ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeysIterator(keys, schema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
- }
-
- default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
- ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
- }
-
- @Override
- default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema schema) throws IOException {
- ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(schema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
- }
-
- @Override
- default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
- ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema);
- return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
- }
-
- default Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
- return getIndexedRecordByKey(key, readerSchema)
- .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
- }
-}
+/**
+ * Marker interface for every {@link HoodieFileReader} reading in Avro (ie
+ * producing {@link IndexedRecord}s)
+ */
+public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> {}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
new file mode 100644
index 0000000000..5cee50449a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+/**
+ * Base class for every {@link HoodieAvroFileReader}
+ */
+abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader {
+
+ @Override
+ public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema);
+ return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+ }
+
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException {
+ return getIndexedRecordIterator(readerSchema, readerSchema);
+ }
+
+ protected abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index 90323d8883..7a70a1f2a3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -37,8 +37,12 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -53,10 +57,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
@@ -64,7 +69,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
* <p>
* {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
*/
-public class HoodieAvroHFileReader implements HoodieAvroFileReader {
+public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements HoodieSeekingFileReader<IndexedRecord> {
// TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
public static final String SCHEMA_KEY = "schema";
@@ -114,6 +119,30 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
.orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader)));
}
+ @Override
+ public Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
+ synchronized (sharedScannerLock) {
+ return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema)
+ .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+ }
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
+ // We're caching blocks for this scanner to minimize amount of traffic
+ // to the underlying storage as we fetched (potentially) sparsely distributed
+ // keys
+ HFileScanner scanner = getHFileScanner(reader, true);
+ ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, keys, getSchema(), schema);
+ return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
+ ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema);
+ return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+ }
+
@Override
public String[] readMinMaxRecordKeys() {
// NOTE: This access to reader is thread-safe
@@ -169,28 +198,19 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
}
}
- @SuppressWarnings("unchecked")
@Override
- public Option<IndexedRecord> getIndexedRecordByKey(String key, Schema readerSchema) throws IOException {
- synchronized (sharedScannerLock) {
- return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema);
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+ if (!Objects.equals(readerSchema, requestedSchema)) {
+ throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
}
- }
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException {
// TODO eval whether seeking scanner would be faster than pread
HFileScanner scanner = getHFileScanner(reader, false);
return new RecordIterator(scanner, getSchema(), readerSchema);
}
- @Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
+ @VisibleForTesting
+ protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
@@ -198,9 +218,8 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
return new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
}
- @SuppressWarnings("unchecked")
- @Override
- public ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
+ @VisibleForTesting
+ protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 051add1102..77b86fe1f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -36,11 +36,13 @@ import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
+import java.util.Objects;
import java.util.Set;
-public class HoodieAvroOrcReader implements HoodieAvroFileReader {
- private Path path;
- private Configuration conf;
+public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase {
+
+ private final Path path;
+ private final Configuration conf;
private final BaseFileUtils orcUtils;
public HoodieAvroOrcReader(Configuration configuration, Path path) {
@@ -65,30 +67,28 @@ public class HoodieAvroOrcReader implements HoodieAvroFileReader {
}
@Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ if (!Objects.equals(readerSchema, requestedSchema)) {
+ throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
+ }
+
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
- TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
+ TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readerSchema);
RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
- return new OrcReaderIterator<>(recordReader, schema, orcSchema);
+ return new OrcReaderIterator<>(recordReader, readerSchema, orcSchema);
} catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader.", io);
}
}
- @Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
@Override
public Schema getSchema() {
return orcUtils.readAvroSchema(conf, path);
}
@Override
- public void close() {
- }
+ public void close() {}
@Override
public long getTotalRecords() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index 769ef391b4..e249da0b8a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -23,9 +23,12 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.parquet.avro.AvroParquetReader;
@@ -37,12 +40,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-public class HoodieAvroParquetReader implements HoodieAvroFileReader {
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
private final Path path;
private final Configuration conf;
private final BaseFileUtils parquetUtils;
- private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
+ private final List<ParquetReaderIterator> readerIterators = new ArrayList<>();
public HoodieAvroParquetReader(Configuration configuration, Path path) {
this.conf = configuration;
@@ -50,6 +55,15 @@ public class HoodieAvroParquetReader implements HoodieAvroFileReader {
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
}
+ @Override
+ public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema) throws IOException {
+ // TODO(HUDI-4588) remove after HUDI-4588 is resolved
+ // NOTE: This is a workaround to avoid leveraging projection w/in [[AvroParquetReader]],
+ // until schema handling issues (nullability canonicalization, etc) are resolved
+ ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema);
+ return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+ }
+
@Override
public String[] readMinMaxRecordKeys() {
return parquetUtils.readMinMaxRecordKeys(conf, path);
@@ -66,12 +80,12 @@ public class HoodieAvroParquetReader implements HoodieAvroFileReader {
}
@Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
return getIndexedRecordIteratorInternal(schema, Option.empty());
}
@Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema));
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 8152a176a0..30dbc5b94e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -22,13 +22,23 @@ import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
import java.util.Set;
+/**
+ * Hudi's File Reader interface providing common set of APIs to fetch
+ *
+ * <ul>
+ * <li>{@link HoodieRecord}s</li>
+ * <li>Metadata (statistics, bloom-filters, etc)</li>
+ * </ul>
+ *
+ * from a file persisted in storage.
+ *
+ * @param <T> target engine-specific representation of the raw data ({@code IndexedRecord} for Avro,
+ * {@code InternalRow} for Spark, etc)
+ */
public interface HoodieFileReader<T> extends AutoCloseable {
String[] readMinMaxRecordKeys();
@@ -37,36 +47,14 @@ public interface HoodieFileReader<T> extends AutoCloseable {
Set<String> filterRowKeys(Set<String> candidateRowKeys);
- ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException;
-
ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
- default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
- return getRecordIterator(getSchema());
+ default ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException {
+ return getRecordIterator(readerSchema, readerSchema);
}
- default Option<HoodieRecord<T>> getRecordByKey(String key, Schema readerSchema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- default Option<HoodieRecord<T>> getRecordByKey(String key) throws IOException {
- return getRecordByKey(key, getSchema());
- }
-
- default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> keys) throws IOException {
- return getRecordsByKeysIterator(keys, getSchema());
- }
-
- default ClosableIterator<HoodieRecord<T>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
- throw new UnsupportedEncodingException();
- }
-
- default ClosableIterator<HoodieRecord<T>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
- return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
+ default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
+ return getRecordIterator(getSchema());
}
Schema getSchema();
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
similarity index 75%
copy from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
copy to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
index 8152a176a0..ee4a5f5e7a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io.storage;
import org.apache.avro.Schema;
-import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
@@ -27,23 +26,8 @@ import org.apache.hudi.common.util.Option;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
-import java.util.Set;
-public interface HoodieFileReader<T> extends AutoCloseable {
-
- String[] readMinMaxRecordKeys();
-
- BloomFilter readBloomFilter();
-
- Set<String> filterRowKeys(Set<String> candidateRowKeys);
-
- ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException;
-
- ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
- default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
- return getRecordIterator(getSchema());
- }
+public interface HoodieSeekingFileReader<T> extends HoodieFileReader<T> {
default Option<HoodieRecord<T>> getRecordByKey(String key, Schema readerSchema) throws IOException {
throw new UnsupportedOperationException();
@@ -69,9 +53,4 @@ public interface HoodieFileReader<T> extends AutoCloseable {
return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
}
- Schema getSchema();
-
- void close();
-
- long getTotalRecords();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index f777c55e89..b22ebc1ad7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,6 +18,9 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -50,12 +53,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -97,7 +96,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private final boolean reuse;
// Readers for the latest file slice corresponding to file groups in the metadata partition
- private Map<Pair<String, String>, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>> partitionReaders =
+ private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader>> partitionReaders =
new ConcurrentHashMap<>();
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
@@ -164,12 +163,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
(SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't access previously cached
// readers, and therefore have to always open new ones
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
openReaders(partitionName, fileSlice);
try {
List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
@@ -210,11 +209,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
AtomicInteger fileSlicesKeysCount = new AtomicInteger();
partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
try {
List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
return;
@@ -281,7 +280,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return logRecords;
}
- private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
+ private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
List<String> keys,
boolean fullKeys,
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
@@ -290,7 +289,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
- if (baseFileReader == null) {
+ if (reader == null) {
// No base file at all
timings.add(timer.endTimer());
if (fullKeys) {
@@ -310,7 +309,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
readTimer.startTimer();
Map<String, HoodieRecord<HoodieMetadataPayload>> records =
- fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);
+ fetchBaseFileRecordsByKeys(reader, keys, fullKeys, partitionName);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
@@ -342,12 +341,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
- private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader baseFileReader,
+ @SuppressWarnings("unchecked")
+ private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
List<String> keys,
boolean fullKeys,
String partitionName) throws IOException {
- ClosableIterator<HoodieRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
- : baseFileReader.getRecordsByKeyPrefixIterator(keys);
+ ClosableIterator<HoodieRecord<?>> records = fullKeys
+ ? reader.getRecordsByKeysIterator(keys)
+ : reader.getRecordsByKeyPrefixIterator(keys);
return toStream(records)
.map(record -> {
@@ -402,21 +403,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* @param slice - The file slice to open readers for
* @return File reader and the record scanner pair for the requested file slice
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
if (reuse) {
- return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
- return openReaders(partitionName, slice); });
+ Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
+ return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
} else {
return openReaders(partitionName, slice);
}
}
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
try {
HoodieTimer timer = new HoodieTimer().startTimer();
// Open base file reader
- Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
- HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+ Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
+ HoodieSeekingFileReader<?> baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
@@ -434,18 +435,20 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
- private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
- HoodieFileReader baseFileReader = null;
+ private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
+ HoodieSeekingFileReader<?> baseFileReader;
Long baseFileOpenMs;
// If the base file is present then create a reader
Option<HoodieBaseFile> basefile = slice.getBaseFile();
if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), new Path(basefilePath));
+ String baseFilePath = basefile.get().getPath();
+ baseFileReader = (HoodieSeekingFileReader<?>) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(hadoopConf.get(), new Path(baseFilePath));
baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath,
+ LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath,
basefile.get().getCommitTime(), baseFileOpenMs));
} else {
+ baseFileReader = null;
baseFileOpenMs = 0L;
timer.endTimer();
}
@@ -572,7 +575,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* @param partitionFileSlicePair - Partition and FileSlice
*/
private synchronized void close(Pair<String, String> partitionFileSlicePair) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
partitionReaders.remove(partitionFileSlicePair);
closeReader(readers);
}
@@ -587,7 +590,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
partitionReaders.clear();
}
- private void closeReader(Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers) {
+ private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers) {
if (readers != null) {
try {
if (readers.getKey() != null) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 1b3bc83547..459dff0b45 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -25,9 +25,11 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -301,8 +303,10 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
Iterable<IndexedRecord> indexedRecords = () -> {
try {
- return ((HoodieAvroFileReader)HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
- .getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))).getIndexedRecordIterator(readerSchema);
+ HoodieFileReaderFactory fileReaderFactory = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
+ HoodieAvroFileReader fileReader = (HoodieAvroFileReader) fileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
+
+ return new MappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index e9accb0a86..4daf8df523 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format.mor;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -715,7 +716,7 @@ public class MergeOnReadInputFormat
private final Set<String> keyToSkip = new HashSet<>();
- private final Properties payloadProps;
+ private final TypedProperties payloadProps;
private RowData currentRecord;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 5b7e7fbc67..a3b4a6c166 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
@@ -41,7 +42,7 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array
private long count = 0;
private ArrayWritable valueObj;
private HoodieAvroHFileReader reader;
- private Iterator<IndexedRecord> recordIterator;
+ private Iterator<HoodieRecord<IndexedRecord>> recordIterator;
private Schema schema;
public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException {
@@ -56,14 +57,14 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array
@Override
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
if (recordIterator == null) {
- recordIterator = reader.getIndexedRecordIterator(schema);
+ recordIterator = reader.getRecordIterator(schema);
}
if (!recordIterator.hasNext()) {
return false;
}
- IndexedRecord record = recordIterator.next();
+ IndexedRecord record = recordIterator.next().getData();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema);
value.set(aWritable.get());
count++;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 0644656ce4..7d8aa9f504 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -269,7 +270,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
HoodieAvroFileReader reader = TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(),
new Path(fileSlice.getBaseFile().get().getPath())));
- return reader.getIndexedRecordIterator(schema);
+ return new MappingIterator<>(reader.getRecordIterator(schema), HoodieRecord::getData);
} else {
// If there is no data file, fall back to reading log files
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 04daafbab0..ff51e02702 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -18,17 +18,16 @@
package org.apache.hudi;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-
-import org.apache.avro.Schema;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
-import java.util.Properties;
public class HoodieSparkRecordMerger implements HoodieRecordMerger {
@@ -38,7 +37,7 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 53380c9aa6..9bbf0a9414 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -752,9 +752,9 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)
- reader.getIndexedRecordIterator(requiredAvroSchema).asScala
+ reader.getRecordIterator(requiredAvroSchema).asScala
.map(record => {
- avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get
+ avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
})
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1b8cb57936..99313884c3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -355,18 +355,7 @@ object HoodieSparkSqlWriter {
processedRecord
}
- def getSparkProcessedRecord(partitionParam: String, record: InternalRow,
- dropPartitionColumns: Boolean, schema: StructType): (InternalRow, StructType) = {
- var processedRecord = record
- var writeSchema = schema
- if (dropPartitionColumns) {
- writeSchema = generateSparkSchemaWithoutPartitionColumns(partitionParam, schema)
- processedRecord = HoodieInternalRowUtils.rewriteRecord(record, schema, writeSchema)
- }
- (processedRecord, writeSchema)
- }
-
- def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
+ private def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
@@ -874,16 +863,23 @@ object HoodieSparkSqlWriter {
// ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
- df.queryExecution.toRdd.map(row => {
- val internalRow = row.copy()
- val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structType)
- val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
- val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
- val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-
- new HoodieSparkRecord(key, processedRow, writeSchema)
- }).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
+ df.queryExecution.toRdd.mapPartitions { iter =>
+ val projection: Function[InternalRow, InternalRow] = if (dropPartitionColumns) {
+ val newSchema = generateSparkSchemaWithoutPartitionColumns(partitionCols, structType)
+ HoodieInternalRowUtils.getCachedUnsafeProjection(structType, newSchema)
+ } else {
+ identity
+ }
+
+ iter.map { internalRow =>
+ val processedRow = projection(internalRow)
+ val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
+ val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
+ val key = new HoodieKey(recordKey.toString, partitionPath.toString)
+
+ new HoodieSparkRecord(key, processedRow, false)
+ }
+ }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
}
}
-
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index f5a3834304..256bc14e82 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -18,36 +18,36 @@
package org.apache.hudi
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection}
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.commmon.model.HoodieSparkRecord
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hudi.LogFileIterator._
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.common.util.HoodieRecordUtils
+import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.spark.sql.HoodieCatalystExpressionUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.types.StructType
+
import java.io.Closeable
-import java.util.Properties
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.Try
@@ -65,13 +65,14 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))
- protected val payloadProps = tableState.preCombineFieldOpt
+ protected val payloadProps: TypedProperties = tableState.preCombineFieldOpt
.map { preCombineField =>
HoodiePayloadConfig.newBuilder
.withPayloadOrderingField(preCombineField)
.build
.getProps
- }.getOrElse(new Properties())
+ }
+ .getOrElse(new TypedProperties())
protected override val avroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema
@@ -82,8 +83,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
protected var recordToLoad: InternalRow = _
protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
-
- protected val requiredSchemaSafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
+ protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
// TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
private var logScanner = {
@@ -122,7 +122,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
recordToLoad = deserialize(projectedAvroRecord)
true
case Some(r: HoodieSparkRecord) =>
- recordToLoad = requiredSchemaSafeRowProjection(r.getData)
+ recordToLoad = requiredSchemaUnsafeRowProjection(r.getData)
true
case None => this.hasNextInternal
}
@@ -238,7 +238,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
// on the record from the Delta Log
recordMerger.getRecordType match {
case HoodieRecordType.SPARK =>
- val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
+ val curRecord = new HoodieSparkRecord(curRow)
val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
toScalaOption(result)
.map(r => {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index 371a2d7b41..dc408ee919 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -18,6 +18,7 @@
package org.apache.hudi.bootstrap;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -38,13 +39,10 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-
-import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -94,12 +92,11 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
} else if (recordType == HoodieRecordType.SPARK) {
SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator;
StructType structType = inputDataset.schema();
- return inputDataset.queryExecution().toRdd().toJavaRDD().map(row -> {
- InternalRow internalRow = row.copy();
+ return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> {
String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString();
String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
HoodieKey key = new HoodieKey(recordKey, partitionPath);
- return new HoodieSparkRecord(key, internalRow, structType);
+ return new HoodieSparkRecord(key, internalRow, false);
});
} else {
throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 03dcd2db4d..a77a14047d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -646,7 +646,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
var recordsReadDF = spark.read.format("org.apache.hudi")
.options(readOpts)
.load(basePath + "/*/*")
- assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0)
+
+ assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count())
// Specify fieldType as TIMESTAMP
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
@@ -659,7 +660,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
.options(readOpts)
.load(basePath + "/*/*")
val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd")))
- assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
+
+ assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count())
// Mixed fieldType
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index d4ca8bc402..d537dcef4b 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -48,6 +49,12 @@ import scala.collection.mutable.ArrayBuffer
*/
class Spark2Adapter extends SparkAdapter {
+ override def isColumnarBatchRow(r: InternalRow): Boolean = {
+ // NOTE: In Spark 2.x there's no [[ColumnarBatchRow]], instead [[MutableColumnarRow]] is leveraged
+ // for vectorized reads
+ r.isInstanceOf[MutableColumnarRow]
+ }
+
override def getCatalogUtils: HoodieCatalogUtils = {
throw new UnsupportedOperationException("Catalog utilities are not supported in Spark 2.x");
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index bed6ff3350..0a4bc289b3 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarUtils
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalogUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession}
/**
@@ -37,6 +38,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU
*/
class Spark3_1Adapter extends BaseSpark3Adapter {
+ override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r)
+
override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark31CatalogUtils
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
new file mode 100644
index 0000000000..e6015a65cb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.vectorized
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+object ColumnarUtils {
+
+ /**
+ * Utility verifying whether provided instance of [[InternalRow]] is actually
+ * an instance of [[ColumnarBatchRow]]
+ *
+ * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private
+ */
+ def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index 8e15745d57..4c58f6b119 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -29,12 +29,15 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Sp
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarUtils
/**
* Implementation of [[SparkAdapter]] for Spark 3.2.x branch
*/
class Spark3_2Adapter extends BaseSpark3Adapter {
+ override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r)
+
override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark32CatalogUtils
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
new file mode 100644
index 0000000000..e6015a65cb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.vectorized
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+object ColumnarUtils {
+
+ /**
+ * Utility verifying whether provided instance of [[InternalRow]] is actually
+ * an instance of [[ColumnarBatchRow]]
+ *
+ * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private
+ */
+ def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 72bc1d19f4..5f5414eb7e 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32PlusHoodieParquetFileFormat}
import org.apache.spark.sql.parser.HoodieSpark3_3ExtendedSqlParser
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatchRow
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark33CatalogUtils, HoodieSpark33CatalystExpressionUtils, HoodieSpark33CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession}
/**
@@ -35,6 +36,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU
*/
class Spark3_3Adapter extends BaseSpark3Adapter {
+ override def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark33CatalogUtils
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark33CatalystExpressionUtils