You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/06/26 06:47:06 UTC
[hudi] branch master updated: [HUDI-684] Introduced abstraction for
writing and reading different types of base file formats. (#1687)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2603cfb [HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)
2603cfb is described below
commit 2603cfb33e272632d7f36a53e1b13fe86dbb8627
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Jun 25 23:46:55 2020 -0700
[HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)
Notable changes:
1. HoodieFileWriter and HoodieFileReader abstractions for writer/reader side of a base file format
2. HoodieDataBlock abstraction for creation specific data blocks for base file formats. (e.g. Parquet has HoodieAvroDataBlock)
3. All hardocded references to Parquet / Parquet based classes have been abstracted to call methods which accept a base file format
4. HiveSyncTool accepts the base file format as a CLI parameter
5. HoodieDeltaStreamer accepts the base file format as a CLI parameter
6. HoodieSparkSqlWriter accepts the base file format as a parameter
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 10 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 4 +-
.../org/apache/hudi/io/HoodieCreateHandle.java | 15 +-
.../org/apache/hudi/io/HoodieKeyLookupHandle.java | 17 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 19 +-
.../org/apache/hudi/io/HoodieRangeInfoHandle.java | 9 +-
.../java/org/apache/hudi/io/HoodieReadHandle.java | 10 +
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 10 +-
...dieStorageWriter.java => HoodieFileWriter.java} | 2 +-
...erFactory.java => HoodieFileWriterFactory.java} | 12 +-
.../hudi/io/storage/HoodieParquetWriter.java | 17 +-
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 16 +-
.../java/org/apache/hudi/table/HoodieTable.java | 36 ++-
.../table/action/commit/CommitActionExecutor.java | 15 +-
.../deltacommit/DeltaCommitActionExecutor.java | 2 +-
.../hudi/table/action/rollback/RollbackHelper.java | 6 +-
.../hudi/index/bloom/TestHoodieBloomIndex.java | 6 +-
...ctory.java => TestHoodieFileWriterFactory.java} | 10 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 286 ++++++++++++---------
.../hudi/testutils/HoodieClientTestUtils.java | 14 +-
.../hudi/testutils/HoodieMergeOnReadTestUtils.java | 35 +--
.../java/org/apache/hudi/common/fs/FSUtils.java | 26 +-
.../hudi/common/table/HoodieTableMetaClient.java | 33 ++-
.../hudi/common/table/TableSchemaResolver.java | 23 +-
.../table/log/AbstractHoodieLogRecordScanner.java | 5 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 2 +-
.../hudi/common/table/log/LogReaderUtils.java | 6 +-
.../table/log/block/HoodieAvroDataBlock.java | 98 ++-----
.../common/table/log/block/HoodieDataBlock.java | 132 ++++++++++
.../common/table/log/block/HoodieLogBlock.java | 4 +-
.../table/view/AbstractTableFileSystemView.java | 3 +-
.../hudi/common/util}/ParquetReaderIterator.java | 2 +-
.../apache/hudi/io/storage/HoodieFileReader.java | 22 +-
.../hudi/io/storage/HoodieFileReaderFactory.java | 47 ++++
.../hudi/io/storage/HoodieParquetReader.java | 80 ++++++
.../common/functional/TestHoodieAvroLogFormat.java | 24 +-
.../common/functional/TestHoodieLogFormat.java | 124 +++++----
.../hudi/common/testutils/HoodieTestUtils.java | 50 +++-
.../common/util}/TestParquetReaderIterator.java | 2 +-
.../io/storage/TestHoodieFileReaderFactory.java | 54 ++++
.../hudi/hadoop/HoodieParquetInputFormat.java | 4 +-
.../hadoop/hive/HoodieCombineHiveInputFormat.java | 8 +-
.../realtime/AbstractRealtimeRecordReader.java | 6 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 63 ++++-
.../utils/HoodieRealtimeRecordReaderUtils.java | 15 +-
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 31 ++-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 23 +-
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 3 +
.../java/org/apache/hudi/hive/HiveSyncTool.java | 37 +--
.../org/apache/hudi/integ/ITTestHoodieDemo.java | 10 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 3 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 2 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +
.../hudi/utilities/deltastreamer/DeltaSync.java | 6 +-
.../deltastreamer/HoodieDeltaStreamer.java | 15 ++
55 files changed, 1032 insertions(+), 484 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f7c4d65..a5fe4fe 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -30,8 +30,8 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
@@ -118,8 +118,8 @@ public class HoodieLogFileCommand implements CommandMarker {
dummyInstantTimeCount++;
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
- if (n instanceof HoodieAvroDataBlock) {
- recordCount = ((HoodieAvroDataBlock) n).getRecords().size();
+ if (n instanceof HoodieDataBlock) {
+ recordCount = ((HoodieDataBlock) n).getRecords().size();
}
}
if (commitCountAndMetadata.containsKey(instantTime)) {
@@ -215,8 +215,8 @@ public class HoodieLogFileCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
- if (n instanceof HoodieAvroDataBlock) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) n;
+ if (n instanceof HoodieDataBlock) {
+ HoodieDataBlock blk = (HoodieDataBlock) n;
List<IndexedRecord> records = blk.getRecords();
for (IndexedRecord record : records) {
if (allRecords.size() < limit) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index cc91e89..12e3a07 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -34,7 +34,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -207,7 +207,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
if (recordList.size() > 0) {
- writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
+ writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
recordList.clear();
}
if (keysToDelete.size() > 0) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 5d64c84..c437f54 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.io.storage.HoodieStorageWriter;
-import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
@@ -47,7 +46,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
- private final HoodieStorageWriter<IndexedRecord> storageWriter;
+ private final HoodieFileWriter<IndexedRecord> fileWriter;
private final Path path;
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
@@ -68,8 +67,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
- this.storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
+ this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchema,
+ this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -88,7 +87,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
@Override
public boolean canWrite(HoodieRecord record) {
- return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
+ return fileWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
}
/**
@@ -101,7 +100,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
- storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
+ fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
@@ -156,7 +155,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
- storageWriter.close();
+ fileWriter.close();
HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(writeStatus.getPartitionPath());
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index 02f351e..4002b5a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -61,23 +61,26 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
HoodieTimer timer = new HoodieTimer().startTimer();
- this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
- new Path(getLatestDataFile().getPath()));
+
+ try {
+ this.bloomFilter = createNewFileReader().readBloomFilter();
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
+ }
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
}
/**
* Given a list of row keys and one file, return only row keys existing in that file.
*/
- public static List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
- Path filePath) throws HoodieIndexException {
+ public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
+ Path filePath) throws HoodieIndexException {
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
- Set<String> fileRowKeys =
- ParquetUtils.filterParquetRowKeys(configuration, filePath, new HashSet<>(candidateRecordKeys));
+ Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3e6af34..0eaf3f2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -37,8 +37,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.storage.HoodieStorageWriter;
-import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -61,7 +60,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private Map<String, HoodieRecord<T>> keyToNewRecords;
private Set<String> writtenRecordKeys;
- private HoodieStorageWriter<IndexedRecord> storageWriter;
+ private HoodieFileWriter<IndexedRecord> fileWriter;
private Path newFilePath;
private Path oldFilePath;
private long recordsWritten = 0;
@@ -115,7 +114,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
- + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString();
+ + FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
@@ -131,8 +130,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
createMarkerFile(partitionPath);
// Create the writer for writing the new version file
- storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+ fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -190,7 +189,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
if (indexedRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
- storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
+ fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
recordsWritten++;
} else {
recordsDeleted++;
@@ -243,7 +242,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
String errMsg = "Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
+ " to new file " + newFilePath;
try {
- storageWriter.writeAvro(key, oldRecord);
+ fileWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
+ " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
@@ -277,8 +276,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
keyToNewRecords.clear();
writtenRecordKeys.clear();
- if (storageWriter != null) {
- storageWriter.close();
+ if (fileWriter != null) {
+ fileWriter.close();
}
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
index e84b0fa..2b58583 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
@@ -18,14 +18,12 @@
package org.apache.hudi.io;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.Path;
+import java.io.IOException;
/**
* Extract range information for a given file slice.
@@ -37,8 +35,7 @@ public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends Hoodie
super(config, null, hoodieTable, partitionPathFilePair);
}
- public String[] getMinMaxKeys() {
- HoodieBaseFile dataFile = getLatestDataFile();
- return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(dataFile.getPath()));
+ public String[] getMinMaxKeys() throws IOException {
+ return createNewFileReader().readMinMaxRecordKeys();
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 6662d00..8f2d4de 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -22,9 +22,14 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
+import java.io.IOException;
+
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* Base class for read operations done logically on the file group.
@@ -56,4 +61,9 @@ public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends Ho
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFilePair.getLeft(), partitionPathFilePair.getRight()).get();
}
+
+ protected HoodieFileReader createNewFileReader() throws IOException {
+ return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
+ new Path(getLatestDataFile().getPath()));
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 08bb06e..4470b76 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -30,6 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -86,7 +88,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId));
+ return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId,
+ hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
}
/**
@@ -180,4 +183,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
}
+
+ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable,
+ HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+ return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
similarity index 94%
copy from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
copy to hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index d8cdb0f..ea9ecad 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -24,7 +24,7 @@ import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
-public interface HoodieStorageWriter<R extends IndexedRecord> {
+public interface HoodieFileWriter<R extends IndexedRecord> {
void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
similarity index 85%
rename from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
rename to hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index cba4b05..90566fb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,23 +34,21 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import java.io.IOException;
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
- public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
+ public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> getFileWriter(
String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
- final String name = path.getName();
- final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
+ final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
+ return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
- private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
+ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = BloomFilterFactory
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 35258ff..8c4c7e6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -42,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
* the current file can take more records with the <code>canWrite()</code>
*/
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
- extends ParquetWriter<IndexedRecord> implements HoodieStorageWriter<R> {
+ extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
private static AtomicLong recordIndex = new AtomicLong(1);
@@ -51,7 +50,6 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String instantTime;
- private final Schema schema;
private final SparkTaskContextSupplier sparkTaskContextSupplier;
public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig,
@@ -60,10 +58,10 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
- ParquetWriter.DEFAULT_WRITER_VERSION, registerFileSystem(file, parquetConfig.getHadoopConf()));
+ ParquetWriter.DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs =
- (HoodieWrapperFileSystem) this.file.getFileSystem(registerFileSystem(file, parquetConfig.getHadoopConf()));
+ (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
@@ -72,18 +70,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
- this.schema = schema;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
- public static Configuration registerFileSystem(Path file, Configuration conf) {
- Configuration returnConf = new Configuration(conf);
- String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
- returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
- HoodieWrapperFileSystem.class.getName());
- return returnConf;
- }
-
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
String seqId =
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 64b3bf0..9f3bb82 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -27,7 +27,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -44,6 +43,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
@@ -58,9 +59,6 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -150,11 +148,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- try (ParquetReader<IndexedRecord> reader =
- AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build()) {
- wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+ HoodieFileReader<IndexedRecord> storageReader =
+ HoodieFileReaderFactory.getFileReader(getHadoopConf(), upsertHandle.getOldFilePath());
+
+ try {
+ wrapper =
+ new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0584dbf..49d8858 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -34,12 +34,14 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -310,7 +312,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Schedule compaction for the instant time.
- *
+ *
* @param jsc Spark Context
* @param instantTime Instant Time for scheduling compaction
* @param extraMetadata additional metadata to write into plan
@@ -381,7 +383,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Delete Marker directory corresponding to an instant.
- *
+ *
* @param instantTs Instant Time
*/
public void deleteMarkerDir(String instantTs) {
@@ -422,9 +424,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return;
}
- List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
+ final String baseFileExtension = getBaseFileFormat().getFileExtension();
+ List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
+ baseFileExtension);
List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
- .filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
+ .filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) {
@@ -478,7 +482,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Ensures all files passed either appear or disappear.
- *
+ *
* @param jsc JavaSparkContext
* @param groupByPartition Files grouped by partition
* @param visibility Appear/Disappear
@@ -562,4 +566,26 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
throw new HoodieInsertException("Failed insert schema compability check.", e);
}
}
+
+ public HoodieFileFormat getBaseFileFormat() {
+ return metaClient.getTableConfig().getBaseFileFormat();
+ }
+
+ public HoodieFileFormat getLogFileFormat() {
+ return metaClient.getTableConfig().getLogFileFormat();
+ }
+
+ public HoodieLogBlockType getLogDataBlockFormat() {
+ switch (getBaseFileFormat()) {
+ case PARQUET:
+ return HoodieLogBlockType.AVRO_DATA_BLOCK;
+ default:
+ throw new HoodieException("Base file format " + getBaseFileFormat()
+ + " does not have associated log block format");
+ }
+ }
+
+ public String getBaseFileExtension() {
+ return getBaseFileFormat().getFileExtension();
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index 5208c12..c07d4c9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,6 +31,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -39,9 +40,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
@@ -89,11 +87,12 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- try (ParquetReader<IndexedRecord> reader =
- AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
- wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+ try {
+ HoodieFileReader<IndexedRecord> storageReader =
+ HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath());
+ wrapper =
+ new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
index be3806e..8c24afd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -83,7 +83,7 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
- // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
+ // If canIndexLogFiles, write inserts to log files else write inserts to base files
if (table.getIndex().canIndexLogFiles()) {
return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier, new AppendHandleFactory<>());
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
index f64483a..4a9c20a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
@@ -71,8 +71,9 @@ public class RollbackHelper implements Serializable {
*/
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+ String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
- if (path.toString().contains(".parquet")) {
+ if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return instantToRollback.getTimestamp().equals(fileCommitTime);
} else if (path.toString().contains(".log")) {
@@ -184,8 +185,9 @@ public class RollbackHelper implements Serializable {
Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
+ String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
- if (path.toString().contains(".parquet")) {
+ if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 97acf03..2e91a84 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -257,7 +257,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
List<String> uuids =
Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
- List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
+ HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+ HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table,
+ Pair.of("2016/01/31/", FSUtils.getFileId(filename)));
+ List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
new Path(basePath + "/2016/01/31/" + filename));
assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
old mode 100755
new mode 100644
similarity index 85%
rename from hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
rename to hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
index a82756a..2584643
--- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
@@ -34,26 +34,26 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Tests for {@link HoodieStorageWriterFactory}.
+ * Tests for {@link HoodieFileWriterFactory}.
*/
-public class TestHoodieStorageWriterFactory extends HoodieClientTestBase {
+public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
@Test
- public void testGetStorageWriter() throws IOException {
+ public void testGetFileWriter() throws IOException {
// parquet file format.
final String instantTime = "100";
final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
final HoodieWriteConfig cfg = getConfig();
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
- HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
+ HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
- HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
+ HoodieFileWriter<IndexedRecord> logWriter = HoodieFileWriterFactory.getFileWriter(instantTime, logPath,
table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
}, "should fail since log storage writer is not supported yet.");
assertTrue(thrown.getMessage().contains("format not supported yet."));
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 6092fd8..0e94025 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -46,7 +47,9 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
@@ -66,9 +69,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -90,37 +93,32 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
- private HoodieParquetInputFormat roSnapshotInputFormat;
private JobConf roSnapshotJobConf;
-
- private HoodieParquetInputFormat roInputFormat;
private JobConf roJobConf;
-
- private HoodieParquetRealtimeInputFormat rtInputFormat;
private JobConf rtJobConf;
- @BeforeEach
- public void init() throws IOException {
+ @TempDir
+ public java.nio.file.Path tempFolder;
+ private HoodieFileFormat baseFileFormat;
+
+ static Stream<HoodieFileFormat> argumentsProvider() {
+ return Stream.of(HoodieFileFormat.PARQUET);
+ }
+
+ public void init(HoodieFileFormat baseFileFormat) throws IOException {
+ this.baseFileFormat = baseFileFormat;
+
initDFS();
initSparkContexts("TestHoodieMergeOnReadTable");
hadoopConf.addResource(dfs.getConf());
initPath();
dfs.mkdirs(new Path(basePath));
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
initTestDataGenerator();
- // initialize parquet input format
- roSnapshotInputFormat = new HoodieParquetInputFormat();
- roSnapshotJobConf = new JobConf(jsc.hadoopConfiguration());
- roSnapshotInputFormat.setConf(roSnapshotJobConf);
-
- roInputFormat = new HoodieParquetInputFormat();
+ roSnapshotJobConf = new JobConf(hadoopConf);
roJobConf = new JobConf(hadoopConf);
- roInputFormat.setConf(roJobConf);
-
- rtInputFormat = new HoodieParquetRealtimeInputFormat();
rtJobConf = new JobConf(hadoopConf);
- rtInputFormat.setConf(rtJobConf);
}
@AfterEach
@@ -128,8 +126,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
cleanupResources();
}
- @Test
- public void testSimpleInsertAndUpdate() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testSimpleInsertAndUpdate(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -153,9 +154,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
- hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+ HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
@@ -174,8 +176,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
- @Test
- public void testIncrementalReadsWithCompaction() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testIncrementalReadsWithCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
String partitionPath = "2020/02/20"; // use only one partition for this test
dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
HoodieWriteConfig cfg = getConfig(true);
@@ -190,19 +195,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertAndGetFilePaths(records001, client, cfg, commitTime1);
- // verify only one parquet file shows up with commit time 001
+ // verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
- validateFiles(partitionPath,1, snapshotROFiles, roSnapshotInputFormat,
- roSnapshotJobConf,200, commitTime1);
+ validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1);
FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
- validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
- roJobConf,200, commitTime1);
+ validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
Path firstFilePath = incrementalROFiles[0].getPath();
FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
- validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
- rtJobConf,200, commitTime1);
+ validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1);
+
assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
/**
@@ -215,14 +218,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
- validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
- roJobConf, 200, commitTime1);
+ validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
- validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
- rtJobConf, 200, commitTime1, updateTime);
+ validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// request compaction, but do not perform compaction
String compactionCommitTime = "005";
@@ -230,13 +231,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates go into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
- validateFiles(partitionPath,1, incrementalROFiles, roInputFormat,
- roJobConf, 200, commitTime1);
+ validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
- validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
- rtJobConf, 200, commitTime1, updateTime);
+ validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// write 3 - more inserts
String insertsTime = "006";
@@ -246,44 +245,44 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
- validateFiles(partitionPath, 2, snapshotROFiles, roSnapshotInputFormat,
- roSnapshotJobConf,400, commitTime1, insertsTime);
+ validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify 006 does not show up in RO mode because of pending compaction
- validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
- roJobConf, 200, commitTime1);
+
+ validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
- validateFiles(partitionPath,2, incrementalROFiles, roInputFormat,
- roJobConf, 400, commitTime1, insertsTime);
+ validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime);
// verify 006 shows up in RT views
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
- validateFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
- rtJobConf, 400, commitTime1, updateTime, insertsTime);
+ validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime);
// perform the scheduled compaction
client.compact(compactionCommitTime);
// verify new write shows up in snapshot mode after compaction is complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
- validateFiles(partitionPath,2, snapshotROFiles, roSnapshotInputFormat,
- roSnapshotJobConf,400, commitTime1, compactionCommitTime, insertsTime);
+ validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
+ insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
- validateFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
- roJobConf, 400, commitTime1, compactionCommitTime, insertsTime);
+ validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
+ insertsTime);
}
}
// Check if record level metadata is aggregated properly at the end of write.
- @Test
- public void testMetadataAggregateFromWriteStatus() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testMetadataAggregateFromWriteStatus(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -305,8 +304,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testSimpleInsertUpdateAndDelete() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testSimpleInsertUpdateAndDelete(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -332,7 +334,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -373,23 +375,25 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}
}
- @Test
- public void testCOWToMORConvertedTableRollback() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
// Set TableType to COW
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -425,14 +429,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// Set TableType to MOR
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
@@ -440,8 +444,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(false);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -471,7 +477,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -495,7 +501,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+ basePath);
assertEquals(recordsRead.size(), 200);
statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
@@ -504,12 +511,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Test failed delta commit rollback
secondClient.rollback(commitTime1);
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- // After rollback, there should be no parquet file with the failed commit time
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+ // After rollback, there should be no base file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count());
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
assertEquals(200, recordsRead.size());
}
@@ -525,7 +532,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+ basePath);
assertEquals(200, recordsRead.size());
writeRecords = jsc.parallelize(copyOfRecords, 1);
@@ -537,7 +545,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Test successful delta commit rollback
thirdClient.rollback(commitTime2);
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
@@ -546,7 +554,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size());
@@ -569,7 +577,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -580,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
thirdClient.rollback(compactedCommitTime);
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -589,8 +597,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(false);
try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -618,7 +628,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -641,7 +651,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
- List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+ basePath);
assertEquals(200, recordsRead.size());
statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
@@ -696,7 +707,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Option.empty());
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -724,7 +735,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.restoreToInstant("000");
metaClient = HoodieTableMetaClient.reload(metaClient);
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -751,8 +762,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.build();
}
- @Test
- public void testUpsertPartitioner() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -778,7 +792,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -812,7 +826,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
@@ -823,14 +837,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
- List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+ List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+ basePath);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
}
- @Test
- public void testLogFileCountsAfterCompaction() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testLogFileCountsAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
// insert 100 records
HoodieWriteConfig config = getConfig(true);
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
@@ -902,8 +920,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testSimpleInsertsGeneratedIntoLogFiles(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -939,8 +960,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testInsertsGeneratedIntoLogFilesRollback(@TempDir java.nio.file.Path tempFolder) throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1010,8 +1034,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
- @Test
- public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1063,8 +1090,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to ensure rolling stats are correctly written to metadata file.
*/
- @Test
- public void testRollingStatsInMetadata() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1163,8 +1192,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
*/
- @Test
- public void testRollingStatsWithSmallFileHandling() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
@@ -1296,8 +1328,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail.
*/
- @Test
- public void testHandleUpdateWithMultiplePartitions() throws Exception {
+ @ParameterizedTest
+ @MethodSource("argumentsProvider")
+ public void testHandleUpdateWithMultiplePartitions(HoodieFileFormat baseFileFormat) throws Exception {
+ init(baseFileFormat);
+
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1323,7 +1358,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1401,7 +1436,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
}
}
-
+
private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -1419,7 +1454,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
assertFalse(commit.isPresent());
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1452,14 +1487,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
- return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+ return listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
}
private FileStatus[] getROSnapshotFiles(String partitionPath)
throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath);
- return roSnapshotInputFormat.listStatus(roSnapshotJobConf);
+ return listStatus(roSnapshotJobConf, false);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
@@ -1469,10 +1504,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString());
- return roInputFormat.listStatus(roJobConf);
+ return listStatus(roJobConf, false);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath)
@@ -1482,10 +1516,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString());
- return rtInputFormat.listStatus(rtJobConf);
+ return listStatus(rtJobConf, true);
}
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
@@ -1507,16 +1540,37 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
private void validateFiles(String partitionPath, int expectedNumFiles,
- FileStatus[] files, HoodieParquetInputFormat inputFormat,
- JobConf jobConf, int expectedRecords, String... expectedCommits) {
+ FileStatus[] files, boolean realtime, JobConf jobConf,
+ int expectedRecords, String... expectedCommits) {
assertEquals(expectedNumFiles, files.length);
Set<String> expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());
- List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
- Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, inputFormat);
+ List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
+ Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, realtime);
assertEquals(expectedRecords, records.size());
Set<String> actualCommits = records.stream().map(r ->
r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
assertEquals(expectedCommitsSet, actualCommits);
}
+
+ private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException {
+ return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension());
+ }
+
+ private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException {
+ // This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's
+ // listStatus() is protected.
+ FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf);
+ switch (baseFileFormat) {
+ case PARQUET:
+ if (realtime) {
+ return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf);
+ } else {
+ return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
+ }
+ default:
+ throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
+ }
+ }
}
+
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 698f901..145cde5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -181,15 +182,20 @@ public class HoodieClientTestUtils {
/**
* Obtain all new data written into the Hoodie table since the given timestamp.
*/
- public static Dataset<Row> readSince(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
- String lastCommitTime) {
+ public static Dataset<Row> readSince(String basePath, SQLContext sqlContext,
+ HoodieTimeline commitTimeline, String lastCommitTime) {
List<HoodieInstant> commitsToReturn =
commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
try {
// Go over the commit metadata, and obtain the new files that need to be read.
HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
- return sqlContext.read().parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]))
- .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
+ String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
+ Dataset<Row> rows = null;
+ if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ rows = sqlContext.read().parquet(paths);
+ }
+
+ return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
} catch (IOException e) {
throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e);
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 23bd333..20bcafd 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -19,18 +19,19 @@
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
-
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -45,20 +46,19 @@ import java.util.stream.Collectors;
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
*/
public class HoodieMergeOnReadTestUtils {
- public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) {
- return getRecordsUsingInputFormat(inputPaths, basePath, new Configuration());
- }
-
- public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
- Configuration conf) {
- JobConf jobConf = new JobConf(conf);
- return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat());
+ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
+ String basePath) {
+ return getRecordsUsingInputFormat(conf, inputPaths, basePath, new JobConf(conf), true);
}
- public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths,
+ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
String basePath,
JobConf jobConf,
- HoodieParquetInputFormat inputFormat) {
+ boolean realtime) {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
+ FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
+ realtime, jobConf);
+
Schema schema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
@@ -93,8 +93,8 @@ public class HoodieMergeOnReadTestUtils {
}).orElse(new ArrayList<GenericRecord>());
}
- private static void setPropsForInputFormat(HoodieParquetInputFormat inputFormat, JobConf jobConf,
- Schema schema, String basePath) {
+ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,
+ String basePath) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -116,7 +116,10 @@ public class HoodieMergeOnReadTestUtils {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
- inputFormat.setConf(conf);
+
+ // Hoodie Input formats are also configurable
+ Configurable configurable = (Configurable)inputFormat;
+ configurable.setConf(conf);
jobConf.addResource(conf);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index e7adaa6..fffd43f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -107,15 +107,21 @@ public class FSUtils {
return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
}
+ // TODO: this should be removed
public static String makeDataFileName(String instantTime, String writeToken, String fileId) {
- return String.format("%s_%s_%s.parquet", fileId, writeToken, instantTime);
+ return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieFileFormat.PARQUET.getFileExtension());
+ }
+
+ public static String makeDataFileName(String instantTime, String writeToken, String fileId, String fileExtension) {
+ return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, fileExtension);
}
public static String makeMarkerFile(String instantTime, String writeToken, String fileId) {
return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieTableMetaClient.MARKER_EXTN);
}
- public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs) {
+ public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs,
+ String baseFileExtension) {
ValidationUtils.checkArgument(markerPath.endsWith(HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
@@ -123,8 +129,7 @@ public class FSUtils {
ValidationUtils.checkArgument(begin >= 0,
"Not in marker dir. Marker Path=" + markerPath + ", Expected Marker Root=" + markerRootPath);
String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
- return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""),
- HoodieFileFormat.PARQUET.getFileExtension());
+ return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""), baseFileExtension);
}
public static String maskWithoutFileId(String instantTime, int taskPartitionId) {
@@ -195,12 +200,12 @@ public class FSUtils {
}
public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
- String markerDir) throws IOException {
+ String markerDir, String baseFileExtension) throws IOException {
List<String> dataFiles = new LinkedList<>();
processFiles(fs, markerDir, (status) -> {
String pathStr = status.getPath().toString();
if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
- dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
+ dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
}
return true;
}, false);
@@ -545,4 +550,13 @@ public class FSUtils {
|| inputStream.getWrappedStream().getClass().getCanonicalName()
.equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
}
+
+ public static Configuration registerFileSystem(Path file, Configuration conf) {
+ Configuration returnConf = new Configuration(conf);
+ String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
+ returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
+ HoodieWrapperFileSystem.class.getName());
+ return returnConf;
+ }
+
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 3218ddf..9675b77 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -128,7 +128,8 @@ public class HoodieTableMetaClient implements Serializable {
}
this.timelineLayoutVersion = layoutVersion.orElseGet(() -> tableConfig.getTimelineLayoutVersion().get());
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
- LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath);
+ LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ", baseFileFormat="
+ + this.tableConfig.getBaseFileFormat() + ") from " + basePath);
if (loadActiveTimelineOnLoad) {
LOG.info("Loading Active commit timeline for " + basePath);
getActiveTimeline();
@@ -299,12 +300,22 @@ public class HoodieTableMetaClient implements Serializable {
}
/**
- * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder.
+ * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
String tableName, String archiveLogFolder, String payloadClassName) throws IOException {
return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
- archiveLogFolder, payloadClassName, null);
+ archiveLogFolder, payloadClassName, null, null);
+ }
+
+ /**
+ * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass and
+ * base file format.
+ */
+ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
+ String tableName, String archiveLogFolder, String payloadClassName, String baseFileFormat) throws IOException {
+ return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
+ archiveLogFolder, payloadClassName, null, baseFileFormat);
}
/**
@@ -312,12 +323,20 @@ public class HoodieTableMetaClient implements Serializable {
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
- return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, null);
+ return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, null, null);
+ }
+
+ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
+ HoodieTableType tableType, String tableName,
+ String archiveLogFolder, String payloadClassName,
+ Integer timelineLayoutVersion) throws IOException {
+ return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName,
+ timelineLayoutVersion, null);
}
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName,
- Integer timelineLayoutVersion) throws IOException {
+ Integer timelineLayoutVersion, String baseFileFormat) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
@@ -332,6 +351,10 @@ public class HoodieTableMetaClient implements Serializable {
if (null != timelineLayoutVersion) {
properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION, String.valueOf(timelineLayoutVersion));
}
+
+ if (null != baseFileFormat) {
+ properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toUpperCase());
+ }
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 129f85f..db68667 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -356,20 +356,7 @@ public class TableSchemaResolver {
* @return
*/
public MessageType readSchemaFromLogFile(Path path) throws IOException {
- FileSystem fs = metaClient.getRawFs();
- Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
- HoodieAvroDataBlock lastBlock = null;
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- lastBlock = (HoodieAvroDataBlock) block;
- }
- }
- reader.close();
- if (lastBlock != null) {
- return new AvroSchemaConverter().convert(lastBlock.getSchema());
- }
- return null;
+ return readSchemaFromLogFile(metaClient.getRawFs(), path);
}
/**
@@ -394,11 +381,11 @@ public class TableSchemaResolver {
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
- HoodieAvroDataBlock lastBlock = null;
+ HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- lastBlock = (HoodieAvroDataBlock) block;
+ if (block instanceof HoodieDataBlock) {
+ lastBlock = (HoodieDataBlock) block;
}
}
reader.close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index a35a9ee..806d55f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -267,7 +268,7 @@ public abstract class AbstractHoodieLogRecordScanner {
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
*/
- private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exception {
+ private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
// TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
@@ -302,7 +303,7 @@ public abstract class AbstractHoodieLogRecordScanner {
HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
- processAvroDataBlock((HoodieAvroDataBlock) lastBlock);
+ processDataBlock((HoodieAvroDataBlock) lastBlock);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 1f349ad..38bf83c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -193,7 +193,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content, readerSchema);
} else {
- return HoodieAvroDataBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
+ return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
}
case DELETE_BLOCK:
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index bb079de..0662e68 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -49,8 +49,8 @@ public class LogReaderUtils {
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
while (reader.hasPrev()) {
HoodieLogBlock block = reader.prev();
- if (block instanceof HoodieAvroDataBlock) {
- HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block;
+ if (block instanceof HoodieDataBlock) {
+ HoodieDataBlock lastBlock = (HoodieDataBlock) block;
if (completedTimeline
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) {
writerSchema = new Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 0b3e822..31fc352 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -36,8 +36,6 @@ import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
-import javax.annotation.Nonnull;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -49,56 +47,42 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
+
/**
- * DataBlock contains a list of records serialized using Avro. The Datablock contains 1. Data Block version 2. Total
- * number of records in the block 3. Size of a record 4. Actual avro serialized content of the record
+ * HoodieAvroDataBlock contains a list of records serialized using Avro. It is used with the Parquet base file format.
*/
-public class HoodieAvroDataBlock extends HoodieLogBlock {
+public class HoodieAvroDataBlock extends HoodieDataBlock {
- private List<IndexedRecord> records;
- private Schema schema;
private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
- public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
- @Nonnull Map<HeaderMetadataType, String> footer) {
- super(header, footer, Option.empty(), Option.empty(), null, false);
- this.records = records;
- this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
+ @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+ @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
+ FSDataInputStream inputStream, boolean readBlockLazily) {
+ super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
}
- public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
- this(records, header, new HashMap<>());
+ public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
+ boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
+ Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
+ super(content, inputStream, readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
+ footer);
}
- private HoodieAvroDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
- Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
- @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType, String> footer) {
- super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
- this.schema = readerSchema;
+ public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
+ super(records, header, new HashMap<>());
}
- public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
- boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
- Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
-
- return new HoodieAvroDataBlock(content, inputStream, readBlockLazily,
- Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
- footer);
-
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.AVRO_DATA_BLOCK;
}
@Override
- public byte[] getContentBytes() throws IOException {
-
- // In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records == null) {
- // read block lazily
- createRecordsFromContentBytes();
- }
-
+ protected byte[] serializeRecords() throws IOException {
Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -137,40 +121,10 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
return baos.toByteArray();
}
- @Override
- public HoodieLogBlockType getBlockType() {
- return HoodieLogBlockType.AVRO_DATA_BLOCK;
- }
-
- public List<IndexedRecord> getRecords() {
- if (records == null) {
- try {
- // in case records are absent, read content lazily and then convert to IndexedRecords
- createRecordsFromContentBytes();
- } catch (IOException io) {
- throw new HoodieIOException("Unable to convert content bytes to records", io);
- }
- }
- return records;
- }
-
- public Schema getSchema() {
- // if getSchema was invoked before converting byte [] to records
- if (records == null) {
- getRecords();
- }
- return schema;
- }
-
// TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
// TODO (na) - Implement a recordItr instead of recordList
- private void createRecordsFromContentBytes() throws IOException {
-
- if (readBlockLazily && !getContent().isPresent()) {
- // read log block contents from disk
- inflate();
- }
-
+ @Override
+ protected void deserializeRecords() throws IOException {
SizeAwareDataInputStream dis =
new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(getContent().get())));
@@ -212,6 +166,9 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
//----------------------------------------------------------------------------------------
// DEPRECATED METHODS
+ //
+ // These methods were only supported by HoodieAvroDataBlock and have been deprecated. Hence,
+ // these are only implemented here even though they duplicate the code from HoodieAvroDataBlock.
//----------------------------------------------------------------------------------------
/**
@@ -230,7 +187,7 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
* HoodieLogFormat V1.
*/
@Deprecated
- public static HoodieLogBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
+ public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
@@ -302,5 +259,4 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
output.close();
return baos.toByteArray();
}
-
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
new file mode 100644
index 0000000..1a70fc3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DataBlock contains a list of records serialized using formats compatible with the base file format.
+ * For each base file format there is a corresponding DataBlock format.
+ *
+ * The Datablock contains:
+ * 1. Data Block version
+ * 2. Total number of records in the block
+ * 3. Actual serialized content of the records
+ */
+public abstract class HoodieDataBlock extends HoodieLogBlock {
+
+ protected List<IndexedRecord> records;
+ protected Schema schema;
+
+ public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
+ @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+ @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
+ FSDataInputStream inputStream, boolean readBlockLazily) {
+ super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
+ }
+
+ public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull Map<HeaderMetadataType, String> footer) {
+ super(header, footer, Option.empty(), Option.empty(), null, false);
+ this.records = records;
+ this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ }
+
+ public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
+ this(records, header, new HashMap<>());
+ }
+
+ protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
+ Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
+ @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType, String> footer) {
+ super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
+ this.schema = readerSchema;
+ }
+
+ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
+ Map<HeaderMetadataType, String> header) {
+ switch (logDataBlockFormat) {
+ case AVRO_DATA_BLOCK:
+ return new HoodieAvroDataBlock(recordList, header);
+ default:
+ throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
+ }
+ }
+
+ @Override
+ public byte[] getContentBytes() throws IOException {
+ // In case this method is called before realizing records from content
+ if (getContent().isPresent()) {
+ return getContent().get();
+ } else if (readBlockLazily && !getContent().isPresent() && records == null) {
+ // read block lazily
+ createRecordsFromContentBytes();
+ }
+
+ return serializeRecords();
+ }
+
+ public abstract HoodieLogBlockType getBlockType();
+
+ public List<IndexedRecord> getRecords() {
+ if (records == null) {
+ try {
+ // in case records are absent, read content lazily and then convert to IndexedRecords
+ createRecordsFromContentBytes();
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to convert content bytes to records", io);
+ }
+ }
+ return records;
+ }
+
+ public Schema getSchema() {
+ // if getSchema was invoked before converting byte [] to records
+ if (records == null) {
+ getRecords();
+ }
+ return schema;
+ }
+
+ private void createRecordsFromContentBytes() throws IOException {
+ if (readBlockLazily && !getContent().isPresent()) {
+ // read log block contents from disk
+ inflate();
+ }
+
+ deserializeRecords();
+ }
+
+ protected abstract byte[] serializeRecords() throws IOException;
+
+ protected abstract void deserializeRecords() throws IOException;
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 24144d7..b1a88c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -84,9 +84,7 @@ public abstract class HoodieLogBlock {
throw new HoodieException("No implementation was provided");
}
- public HoodieLogBlockType getBlockType() {
- throw new HoodieException("No implementation was provided");
- }
+ public abstract HoodieLogBlockType getBlockType();
public long getLogBlockLength() {
throw new HoodieException("No implementation was provided");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 9babc80..4fdf20b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -121,7 +121,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
});
long storePartitionsTs = timer.endTimer();
- LOG.info("addFilesToView: NumFiles=" + statuses.length + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
+ LOG.info("addFilesToView: NumFiles=" + statuses.length + ", NumFileGroups=" + fileGroups.size()
+ + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
+ ", StoreTimeTaken=" + storePartitionsTs);
return fileGroups;
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java
rename to hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 2bf5c78..20c79dd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.exception.HoodieIOException;
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
similarity index 66%
copy from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
copy to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index d8cdb0f..0e5ead9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,19 +18,27 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.common.model.HoodieRecord;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.bloom.BloomFilter;
-import java.io.IOException;
+public interface HoodieFileReader<R extends IndexedRecord> {
+
+ public String[] readMinMaxRecordKeys();
+
+ public BloomFilter readBloomFilter();
-public interface HoodieStorageWriter<R extends IndexedRecord> {
+ public Set<String> filterRowKeys(Set<String> candidateRowKeys);
- void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
+ public Iterator<R> getRecordIterator(Schema schema) throws IOException;
- boolean canWrite();
+ Schema getSchema();
- void close() throws IOException;
+ void close();
- void writeAvro(String key, R oldRecord) throws IOException;
+ long getTotalRecords();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
new file mode 100644
index 0000000..1ad85d3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class HoodieFileReaderFactory {
+
+ public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileReader<R> getFileReader(
+ Configuration conf, Path path) throws IOException {
+ final String extension = FSUtils.getFileExtension(path.toString());
+ if (PARQUET.getFileExtension().equals(extension)) {
+ return newParquetFileReader(conf, path);
+ }
+ throw new UnsupportedOperationException(extension + " format not supported yet.");
+ }
+
+ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileReader<R> newParquetFileReader(
+ Configuration conf, Path path) throws IOException {
+ return new HoodieParquetReader<>(conf, path);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
new file mode 100644
index 0000000..107f503
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader {
+ private Path path;
+ private Configuration conf;
+
+ public HoodieParquetReader(Configuration configuration, Path path) {
+ this.conf = configuration;
+ this.path = path;
+ }
+
+ public String[] readMinMaxRecordKeys() {
+ return ParquetUtils.readMinMaxRecordKeys(conf, path);
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ return ParquetUtils.readBloomFilterFromParquetMetadata(conf, path);
+ }
+
+ @Override
+ public Set<String> filterRowKeys(Set candidateRowKeys) {
+ return ParquetUtils.filterParquetRowKeys(conf, path, candidateRowKeys);
+ }
+
+ @Override
+ public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+ AvroReadSupport.setAvroReadSchema(conf, schema);
+ ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
+ return new ParquetReaderIterator(reader);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return ParquetUtils.readAvroSchema(conf, path);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public long getTotalRecords() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
similarity index 65%
rename from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
rename to hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
index d8cdb0f..940ae87 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
@@ -16,21 +16,15 @@
* limitations under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.common.functional;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-import org.apache.avro.generic.IndexedRecord;
-
-import java.io.IOException;
-
-public interface HoodieStorageWriter<R extends IndexedRecord> {
-
- void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
-
- boolean canWrite();
-
- void close() throws IOException;
-
- void writeAvro(String key, R oldRecord) throws IOException;
+/**
+ * Tests Avro log format {@link HoodieAvroDataBlock}.
+ */
+public class TestHoodieAvroLogFormat extends TestHoodieLogFormat {
+ public TestHoodieAvroLogFormat() {
+ super(HoodieLogBlockType.AVRO_DATA_BLOCK);
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index e9f0ef7..6214af1 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -81,12 +82,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Tests hoodie log format {@link HoodieLogFormat}.
*/
@SuppressWarnings("Duplicates")
-public class TestHoodieLogFormat extends HoodieCommonTestHarness {
+public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
private static String BASE_OUTPUT_PATH = "/tmp/";
private FileSystem fs;
private Path partitionPath;
private int bufferSize = 4096;
+ private HoodieLogBlockType dataBlockType;
+
+ public TestHoodieLogFormat(HoodieLogBlockType dataBlockType) {
+ this.dataBlockType = dataBlockType;
+ }
+
+ private TestHoodieLogFormat() {
+ }
@BeforeAll
public static void setUpClass() throws IOException, InterruptedException {
@@ -133,7 +142,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
long size = writer.getCurrentSize();
assertTrue(size > 0, "We just wrote a block - size should be > 0");
@@ -151,7 +160,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
// Write out a block
writer = writer.appendBlock(dataBlock);
// Get the size of the block
@@ -164,7 +173,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0");
assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2");
@@ -217,7 +226,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer2 = writer2.appendBlock(dataBlock);
writer.close();
@@ -235,7 +244,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
long size1 = writer.getCurrentSize();
writer.close();
@@ -245,7 +254,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
long size2 = writer.getCurrentSize();
assertTrue(size2 > size1, "We just wrote a new block - size2 should be > size1");
@@ -259,7 +268,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
long size3 = writer.getCurrentSize();
assertTrue(size3 > size2, "We just wrote a new block - size3 should be > size2");
@@ -289,7 +298,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
* dataBlock = new HoodieAvroDataBlock(records, header); writer = writer.appendBlock(dataBlock); long size1 =
* writer.getCurrentSize(); // do not close this writer - this simulates a data note appending to a log dying
* without closing the file // writer.close();
- *
+ *
* writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
* .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
* .withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -313,7 +322,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
for (int i = 0; i < 2; i++) {
HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
@@ -338,15 +347,15 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it");
HoodieLogBlock nextBlock = reader.next();
- assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, nextBlock.getBlockType(), "The next block should be a data block");
- HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+ assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block");
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords, dataBlockRead.getRecords(),
@@ -366,7 +375,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -377,7 +386,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -389,14 +398,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records3, header);
+ dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
HoodieLogBlock nextBlock = reader.next();
- HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -405,7 +414,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
reader.hasNext();
nextBlock = reader.next();
- dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+ dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords2, dataBlockRead.getRecords(),
@@ -413,7 +422,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
reader.hasNext();
nextBlock = reader.next();
- dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+ dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords3, dataBlockRead.getRecords(),
@@ -443,7 +452,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
allRecords.add(copyOfRecords1);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
}
writer.close();
@@ -472,7 +481,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -498,7 +507,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 10);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -536,7 +545,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -574,7 +583,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Write 2
@@ -582,7 +591,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -621,14 +630,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Write 2
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
// Rollback the last write
@@ -644,7 +653,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- dataBlock = new HoodieAvroDataBlock(records3, header);
+ dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -681,7 +690,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -723,7 +732,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- dataBlock = new HoodieAvroDataBlock(records3, header);
+ dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -761,7 +770,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Write 2
@@ -769,7 +778,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
copyOfRecords1.addAll(copyOfRecords2);
@@ -849,13 +858,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Write 2
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
// Delete 50 keys
@@ -916,7 +925,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Delete 50 keys
@@ -958,7 +967,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
// Write invalid rollback for a failed write (possible for in-flight commits)
@@ -1000,7 +1009,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
@@ -1047,7 +1056,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
@@ -1149,7 +1158,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header);
+ HoodieDataBlock dataBlock = getDataBlock(records.subList(0, numRecordsInLog1), header);
writer = writer.appendBlock(dataBlock);
// Get the size of the block
long size = writer.getCurrentSize();
@@ -1163,7 +1172,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<>();
header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2);
+ HoodieDataBlock dataBlock2 = getDataBlock(records2.subList(0, numRecordsInLog2), header2);
writer2 = writer2.appendBlock(dataBlock2);
// Get the size of the block
writer2.close();
@@ -1227,7 +1236,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1237,7 +1246,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1248,7 +1257,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
- dataBlock = new HoodieAvroDataBlock(records3, header);
+ dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1257,7 +1266,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock prevBlock = reader.prev();
- HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(),
"Third records size should be equal to the written records size");
@@ -1266,7 +1275,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasPrev(), "Second block should be available");
prevBlock = reader.prev();
- dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+ dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords2, dataBlockRead.getRecords(),
@@ -1274,7 +1283,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasPrev(), "First block should be available");
prevBlock = reader.prev();
- dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+ dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -1296,7 +1305,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1322,7 +1331,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1332,7 +1341,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
- assertTrue(block instanceof HoodieAvroDataBlock, "Last block should be datablock");
+ assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
assertTrue(reader.hasPrev(), "Last block should be available");
assertThrows(CorruptedLogFileException.class, () -> {
@@ -1355,7 +1364,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1363,7 +1372,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
- dataBlock = new HoodieAvroDataBlock(records2, header);
+ dataBlock = getDataBlock(records2, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1372,7 +1381,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
- dataBlock = new HoodieAvroDataBlock(records3, header);
+ dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -1388,7 +1397,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// After moving twice, this last reader.prev() should read the First block written
assertTrue(reader.hasPrev(), "First block should be available");
HoodieLogBlock prevBlock = reader.prev();
- HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
"Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -1429,4 +1438,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertEquals(recordsCopy.get(i), readRecords.get(i));
}
}
+
+ private HoodieDataBlock getDataBlock(List<IndexedRecord> records, Map<HeaderMetadataType, String> header) {
+ switch (dataBlockType) {
+ case AVRO_DATA_BLOCK:
+ return new HoodieAvroDataBlock(records, header);
+ default:
+ throw new RuntimeException("Unknown data block type " + dataBlockType);
+ }
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index a7ef208..caae246 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -120,21 +121,41 @@ public class HoodieTestUtils {
return init(getDefaultHadoopConf(), basePath, tableType);
}
+ public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseFileFormat) throws IOException {
+ return init(getDefaultHadoopConf(), basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+ }
+
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath) throws IOException {
return init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType)
throws IOException {
- return init(hadoopConf, basePath, tableType, RAW_TRIPS_TEST_NAME);
+ return init(hadoopConf, basePath, tableType, new Properties());
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
- String tableName) throws IOException {
+ String tableName)
+ throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
- properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
- properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+ return init(hadoopConf, basePath, tableType, properties);
+ }
+
+ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+ HoodieFileFormat baseFileFormat)
+ throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toString());
+ return init(hadoopConf, basePath, tableType, properties);
+ }
+
+ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+ Properties properties)
+ throws IOException {
+ properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
+ properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
+ properties.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
@@ -442,12 +463,31 @@ public class HoodieTestUtils {
});
}
+ // TODO: should be removed
public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
+ return listAllDataFilesInPath(fs, basePath, ".parquet");
+ }
+
+ public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension)
+ throws IOException {
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
+ List<FileStatus> returns = new ArrayList<>();
+ while (itr.hasNext()) {
+ LocatedFileStatus status = itr.next();
+ if (status.getPath().getName().contains(datafileExtension)) {
+ returns.add(status);
+ }
+ }
+ return returns.toArray(new FileStatus[returns.size()]);
+ }
+
+ public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath, String logfileExtension)
+ throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
List<FileStatus> returns = new ArrayList<>();
while (itr.hasNext()) {
LocatedFileStatus status = itr.next();
- if (status.getPath().getName().contains(".parquet")) {
+ if (status.getPath().getName().contains(logfileExtension)) {
returns.add(status);
}
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
similarity index 98%
rename from hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java
rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
index f20c5f9..799ed24 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;
import org.apache.hudi.exception.HoodieIOException;
diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
new file mode 100644
index 0000000..13971d5
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieFileReaderFactory}.
+ */
+public class TestHoodieFileReaderFactory {
+ @TempDir
+ public java.nio.file.Path tempDir;
+
+ @Test
+ public void testGetFileReader() throws IOException {
+ // parquet file format.
+ final Configuration hadoopConf = new Configuration();
+ final Path parquetPath = new Path("/partition/path/f1_1-0-1_000.parquet");
+ HoodieFileReader<IndexedRecord> parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath);
+ assertTrue(parquetReader instanceof HoodieParquetReader);
+
+ // other file format exception.
+ final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
+ final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
+ HoodieFileReader<IndexedRecord> logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath);
+ }, "should fail since log storage reader is not supported yet.");
+ assertTrue(thrown.getMessage().contains("format not supported yet."));
+ }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 4db928c..266cc6f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -101,7 +102,8 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
+ HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
+ HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values());
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index fd7ffb2..a88d152 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -24,7 +24,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
-
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -952,7 +952,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
+ HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
- recordReaders.add(new HoodieParquetRealtimeInputFormat().getRecordReader(inputSplit, job, reporter));
+ if (split.getPaths().length == 0) {
+ continue;
+ }
+ FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(split.getPath(0).toString(), true, job);
+ recordReaders.add(inputFormat.getRecordReader(inputSplit, job, reporter));
}
return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index b7da749..3758b9b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,7 +48,6 @@ public abstract class AbstractRealtimeRecordReader {
protected final HoodieRealtimeFileSplit split;
protected final JobConf jobConf;
- private final MessageType baseFileSchema;
protected final boolean usesCustomPayload;
// Schema handles
private Schema readerSchema;
@@ -66,7 +63,6 @@ public abstract class AbstractRealtimeRecordReader {
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
- baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
init();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
@@ -88,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader {
Schema schemaFromLogFile =
LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
if (schemaFromLogFile == null) {
- writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
+ writerSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index d10b664..6d967f3 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,8 +18,10 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -30,11 +32,15 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
-
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
@@ -61,6 +67,54 @@ public class HoodieInputFormatUtils {
private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+ public static FileInputFormat getInputFormat(HoodieFileFormat baseFileFormat, boolean realtime, Configuration conf) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ if (realtime) {
+ HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
+ inputFormat.setConf(conf);
+ return inputFormat;
+ } else {
+ HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
+ inputFormat.setConf(conf);
+ return inputFormat;
+ }
+ default:
+ throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
+ }
+ }
+
+ public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime, Configuration conf) {
+ FileInputFormat inputFormat = getInputFormat(baseFileFormat, realtime, conf);
+ return inputFormat.getClass().getName();
+ }
+
+ public static String getOutputFormatClassName(HoodieFileFormat baseFileFormat) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ return MapredParquetOutputFormat.class.getName();
+ default:
+ throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat);
+ }
+ }
+
+ public static String getSerDeClassName(HoodieFileFormat baseFileFormat) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ return ParquetHiveSerDe.class.getName();
+ default:
+ throw new HoodieIOException("No SerDe for base file format " + baseFileFormat);
+ }
+ }
+
+ public static FileInputFormat getInputFormat(String path, boolean realtime, Configuration conf) {
+ final String extension = FSUtils.getFileExtension(path.toString());
+ if (extension.equals(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
+ }
+ throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension);
+ }
+
/**
* Filter any specific instants that we do not want to process.
* example timeline:
@@ -255,19 +309,20 @@ public class HoodieInputFormatUtils {
* Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
* based on given table metadata.
* @param fileStatuses
+ * @param fileExtension
* @param metaClientList
* @return
* @throws IOException
*/
public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
- FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
+ FileStatus[] fileStatuses, String fileExtension, Collection<HoodieTableMetaClient> metaClientList) {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetaClient metadata = null;
for (FileStatus status : fileStatuses) {
Path inputPath = status.getPath();
- if (!inputPath.getName().endsWith(".parquet")) {
- //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
+ if (!inputPath.getName().endsWith(fileExtension)) {
+ //FIXME(vc): skip non data files for now. This wont be needed once log file name start
// with "."
continue;
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index cd876b4..cec77f9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -22,7 +22,8 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
@@ -40,8 +41,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -57,14 +56,14 @@ import java.util.stream.Collectors;
public class HoodieRealtimeRecordReaderUtils {
/**
- * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
- * support hive 1.1.0
+ * Reads the schema from the base file.
*/
- public static MessageType readSchema(Configuration conf, Path parquetFilePath) {
+ public static Schema readSchema(Configuration conf, Path filePath) {
try {
- return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
+ HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+ return storageReader.getSchema();
} catch (IOException e) {
- throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
+ throw new HoodieIOException("Failed to read schema from " + filePath, e);
}
}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 6e413cc..918aade 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -59,6 +60,8 @@ public class TestHoodieParquetInputFormat {
private HoodieParquetInputFormat inputFormat;
private JobConf jobConf;
+ private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+ private final String baseFileExtension = baseFileFormat.getFileExtension();
public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) {
int count = 0;
@@ -145,7 +148,7 @@ public class TestHoodieParquetInputFormat {
@Test
public void testInputFormatLoad() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
@@ -161,7 +164,7 @@ public class TestHoodieParquetInputFormat {
@Test
public void testInputFormatUpdates() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
@@ -171,7 +174,7 @@ public class TestHoodieParquetInputFormat {
assertEquals(10, files.length);
// update files
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", true);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true);
// Before the commit
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
@@ -188,7 +191,7 @@ public class TestHoodieParquetInputFormat {
@Test
public void testInputFormatWithCompaction() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
@@ -204,7 +207,7 @@ public class TestHoodieParquetInputFormat {
createCompactionFile(basePath, "125");
// add inserts after compaction timestamp
- InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 5, "200");
+ InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 5, "200");
InputFormatTestUtil.commit(basePath, "200");
// verify snapshot reads show all new inserts even though there is pending compaction
@@ -221,7 +224,7 @@ public class TestHoodieParquetInputFormat {
@Test
public void testIncrementalSimple() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
@@ -266,25 +269,25 @@ public class TestHoodieParquetInputFormat {
@Test
public void testIncrementalWithMultipleCommits() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// update files
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", false);
createCommitFile(basePath, "200", "2016/05/01");
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 4, "300", false);
createCommitFile(basePath, "300", "2016/05/01");
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 3, "400", false);
createCommitFile(basePath, "400", "2016/05/01");
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 2, "500", false);
createCommitFile(basePath, "500", "2016/05/01");
- InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
+ InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 1, "600", false);
createCommitFile(basePath, "600", "2016/05/01");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
@@ -364,14 +367,14 @@ public class TestHoodieParquetInputFormat {
@Test
public void testIncrementalWithPendingCompaction() throws IOException {
// initial commit
- File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// simulate compaction requested at 300
File compactionFile = createCompactionFile(basePath, "300");
// write inserts into new bucket
- InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400");
+ InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 10, "400");
createCommitFile(basePath, "400", "2016/05/01");
// Add the paths
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 05669bb..0201fac 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -20,8 +20,10 @@ package org.apache.hudi.hadoop.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -59,25 +61,29 @@ public class InputFormatTestUtil {
private static String TEST_WRITE_TOKEN = "1-0-1";
- public static File prepareTable(java.nio.file.Path basePath, int numberOfFiles, String commitNumber)
+ public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
+ String commitNumber)
throws IOException {
- HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+ baseFileFormat);
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
Files.createDirectories(partitionPath);
- return simulateInserts(partitionPath.toFile(), "fileId1", numberOfFiles, commitNumber);
+ return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
+ commitNumber);
}
- public static File simulateInserts(File partitionPath, String fileId, int numberOfFiles, String commitNumber)
+ public static File simulateInserts(File partitionPath, String baseFileExtension, String fileId, int numberOfFiles,
+ String commitNumber)
throws IOException {
for (int i = 0; i < numberOfFiles; i++) {
Files.createFile(partitionPath.toPath()
- .resolve(FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i)));
+ .resolve(FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i, baseFileExtension)));
}
return partitionPath;
}
- public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated,
- String newCommit, boolean randomize) throws IOException {
+ public static void simulateUpdates(File directory, String baseFileExtension, final String originalCommit,
+ int numberOfFilesUpdated, String newCommit, boolean randomize) throws IOException {
List<File> dataFiles = Arrays.asList(Objects.requireNonNull(directory.listFiles((dir, name) -> {
String commitTs = FSUtils.getCommitTime(name);
return originalCommit.equals(commitTs);
@@ -88,7 +94,8 @@ public class InputFormatTestUtil {
List<File> toUpdateList = dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
for (File file : toUpdateList) {
String fileId = FSUtils.getFileId(file.getName());
- Files.createFile(directory.toPath().resolve(FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)));
+ Files.createFile(directory.toPath().resolve(FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId,
+ baseFileExtension)));
}
}
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 37f4463..00a35aa 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -35,6 +35,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
public String tableName;
+ @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+ public String baseFileFormat = "PARQUET";
+
@Parameter(names = {"--user"}, description = "Hive username", required = true)
public String hiveUser;
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c3849d7..877ba47 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -19,10 +19,10 @@
package org.apache.hudi.hive;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.HiveSchemaUtil;
@@ -32,8 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -146,21 +144,24 @@ public class HiveSyncTool {
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
- if (!useRealTimeInputFormat) {
- String inputFormatClassName = cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
- : HoodieParquetInputFormat.class.getName();
- hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
- ParquetHiveSerDe.class.getName());
- } else {
- // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
- // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
- // /ql/exec/DDLTask.java#L3488
- String inputFormatClassName =
- cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
- : HoodieParquetRealtimeInputFormat.class.getName();
- hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
- ParquetHiveSerDe.class.getName());
+ HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(cfg.baseFileFormat.toUpperCase());
+ String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat,
+ new Configuration());
+
+ if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && cfg.usePreApacheInputFormat) {
+ // Parquet input format had an InputFormat class visible under the old naming scheme.
+ inputFormatClassName = useRealTimeInputFormat
+ ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
+ : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName();
}
+
+ String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
+ String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+
+ // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+ // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+ // /ql/exec/DDLTask.java#L3488
+ hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, outputFormatClassName, serDeFormatClassName);
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 3af302f..f069a25 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -18,6 +18,7 @@
package org.apache.hudi.integ;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -67,6 +68,8 @@ public class ITTestHoodieDemo extends ITTestBase {
private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands";
private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands";
+ private static HoodieFileFormat baseFileFormat;
+
private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
@@ -76,7 +79,9 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
@Test
- public void testDemo() throws Exception {
+ public void testParquetDemo() throws Exception {
+ baseFileFormat = HoodieFileFormat.PARQUET;
+
setupDemo();
// batch 1
@@ -122,6 +127,7 @@ public class ITTestHoodieDemo extends ITTestBase {
List<String> cmds = CollectionUtils.createImmutableList(
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type COPY_ON_WRITE "
+ + " --base-file-format " + baseFileFormat.toString()
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
@@ -130,12 +136,14 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --database default"
+ " --table " + COW_TABLE_NAME
+ " --base-path " + COW_BASE_PATH
+ + " --base-file-format " + baseFileFormat.toString()
+ " --user hive"
+ " --pass hive"
+ " --jdbc-url jdbc:hive2://hiveserver:10000"
+ " --partitioned-by dt",
("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ "
+ + " --base-file-format " + baseFileFormat.toString()
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index e45bd65..0a915c1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -270,7 +270,7 @@ public class DataSourceUtils {
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
- public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) {
+ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()));
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = basePath;
@@ -280,6 +280,7 @@ public class DataSourceUtils {
hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_DATABASE_OPT_VAL());
hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
+ hiveSyncConfig.baseFileFormat = baseFileFormat;
hiveSyncConfig.hiveUser =
props.getString(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), DataSourceWriteOptions.DEFAULT_HIVE_USER_OPT_VAL());
hiveSyncConfig.hivePass =
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3d1172f..5195f05 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -257,6 +257,7 @@ object DataSourceWriteOptions {
val HIVE_SYNC_ENABLED_OPT_KEY = "hoodie.datasource.hive_sync.enable"
val HIVE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.database"
val HIVE_TABLE_OPT_KEY = "hoodie.datasource.hive_sync.table"
+ val HIVE_BASE_FILE_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.base_file_format"
val HIVE_USER_OPT_KEY = "hoodie.datasource.hive_sync.username"
val HIVE_PASS_OPT_KEY = "hoodie.datasource.hive_sync.password"
val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl"
@@ -270,6 +271,7 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
val DEFAULT_HIVE_DATABASE_OPT_VAL = "default"
val DEFAULT_HIVE_TABLE_OPT_VAL = "unknown"
+ val DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL = "PARQUET"
val DEFAULT_HIVE_USER_OPT_VAL = "hive"
val DEFAULT_HIVE_PASS_OPT_VAL = "hive"
val DEFAULT_HIVE_URL_OPT_VAL = "jdbc:hive2://localhost:10000"
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 88cef8f..5774c89 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -212,6 +212,7 @@ private[hudi] object HoodieSparkSqlWriter {
HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
+ HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
@@ -239,6 +240,7 @@ private[hudi] object HoodieSparkSqlWriter {
private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
+ hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
hiveSyncConfig.usePreApacheInputFormat =
parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 068e592..0d3e90c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -205,7 +205,7 @@ public class DeltaSync implements Serializable {
} else {
this.commitTimelineOpt = Option.empty();
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
- cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName);
+ cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat);
}
}
@@ -274,7 +274,7 @@ public class DeltaSync implements Serializable {
}
} else {
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
- cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName);
+ cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat);
}
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
@@ -474,7 +474,7 @@ public class DeltaSync implements Serializable {
*/
private void syncHive() {
if (cfg.enableHiveSync) {
- HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
+ HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs).syncHoodieTable();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a569c4f..a3d81fa 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -177,6 +177,9 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
public String tableType;
+ @Parameter(names = {"--base-file-format"}, description = "File format for the base files. PARQUET (or) HFILE", required = false)
+ public String baseFileFormat;
+
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -379,8 +382,20 @@ public class HoodieDeltaStreamer implements Serializable {
// This will guarantee there is no surprise with table type
ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)),
"Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.tableType);
+
+ // Load base file format
+ // This will guarantee there is no surprise with base file type
+ String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
+ ValidationUtils.checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null,
+ "Hoodie table's base file format is of type " + baseFileFormat + " but passed in CLI argument is "
+ + cfg.baseFileFormat);
+ cfg.baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
+ this.cfg.baseFileFormat = cfg.baseFileFormat;
} else {
tableType = HoodieTableType.valueOf(cfg.tableType);
+ if (cfg.baseFileFormat == null) {
+ cfg.baseFileFormat = "PARQUET"; // default for backward compatibility
+ }
}
ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,