You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/09 13:10:23 UTC
[hudi] branch master updated: [HUDI-822] decouple Hudi related
logics from HoodieInputFormat (#1592)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 37838ce [HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)
37838ce is described below
commit 37838cea6094ddc66191df42e8b2c84f132d1623
Author: Gary Li <ya...@gmail.com>
AuthorDate: Tue Jun 9 06:10:16 2020 -0700
[HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)
- Refactoring business logic out of InputFormat into Utils helpers.
---
.../hudi/table/TestHoodieMergeOnReadTable.java | 12 +-
.../commit/TestCopyOnWriteActionExecutor.java | 10 +-
.../hudi/hadoop/HoodieParquetInputFormat.java | 236 ++--------------
.../hudi/hadoop/HoodieROTablePathFilter.java | 3 +-
.../org/apache/hudi/hadoop/InputPathHandler.java | 4 +-
.../hudi/hadoop/config/HoodieRealtimeConfig.java | 42 +++
.../realtime/AbstractRealtimeRecordReader.java | 271 +------------------
.../HoodieCombineRealtimeRecordReader.java | 3 +-
.../realtime/HoodieParquetRealtimeInputFormat.java | 106 +-------
.../realtime/RealtimeCompactedRecordReader.java | 35 ++-
.../realtime/RealtimeUnmergedRecordReader.java | 8 +-
.../HoodieHiveUtils.java} | 6 +-
.../HoodieInputFormatUtils.java} | 300 ++++++++++-----------
.../utils/HoodieRealtimeInputFormatUtils.java | 154 +++++++++++
.../HoodieRealtimeRecordReaderUtils.java} | 232 +++-------------
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 27 +-
.../realtime/TestHoodieRealtimeRecordReader.java | 3 +-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 10 +-
18 files changed, 493 insertions(+), 969 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 57c0d8d..53de65e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -46,7 +46,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
@@ -1463,19 +1463,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
String modePropertyName =
- String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
- jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+ String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
- String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
- String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
String stopAtCompactionPropName =
- String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 63a04ec..c95a917 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
@@ -225,15 +225,15 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
String modePropertyName =
- String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
- jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+ String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
- String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
- String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 5a10f3c..4db928c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,10 +18,17 @@
package org.apache.hudi.hadoop;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.io.ArrayWritable;
@@ -31,31 +38,14 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
@@ -69,10 +59,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
protected Configuration conf;
+ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+ return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
+ }
+
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
- List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
+ List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
List<FileStatus> returns = new ArrayList<>();
@@ -107,10 +101,10 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
+ HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
- List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
+ List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
if (result != null) {
returns.addAll(result);
}
@@ -120,36 +114,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
}
/**
- * Filter any specific instants that we do not want to process.
- * example timeline:
- *
- * t0 -> create bucket1.parquet
- * t1 -> create and append updates bucket1.log
- * t2 -> request compaction
- * t3 -> create bucket2.parquet
- *
- * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
- *
- * To workaround this problem, we want to stop returning data belonging to commits > t2.
- * After compaction is complete, incremental reader would see updates in t2, t3, so on.
- */
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
- HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
- Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
- if (pendingCompactionInstant.isPresent()) {
- HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
- int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
- - instantsTimeline.getCommitsTimeline().countInstants();
- LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
- + " skipping " + numCommitsFilteredByCompaction + " commits");
-
- return instantsTimeline;
- } else {
- return timeline;
- }
- }
-
- /**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
@@ -158,157 +122,22 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
- HoodieDefaultTimeline baseTimeline;
- if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
- baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
- } else {
- baseTimeline = tableMetaClient.getActiveTimeline();
- }
-
- HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
- String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
- // Total number of commits to return in this batch. Set this to -1 to get all the commits.
- Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
- List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
- .getInstants().collect(Collectors.toList());
- // Extract partitions touched by the commitsToCheck
- Set<String> partitionsToList = new HashSet<>();
- for (HoodieInstant commit : commitsToCheck) {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
- HoodieCommitMetadata.class);
- partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
- }
- if (partitionsToList.isEmpty()) {
+ Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
return null;
}
- String incrementalInputPaths = partitionsToList.stream()
- .map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
- .filter(s -> {
- /*
- * Ensure to return only results from the original input path that has incremental changes
- * This check is needed for the following corner case - When the caller invokes
- * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
- * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
- * accidentally return all incremental changes for the entire table in every listStatus()
- * call. This will create redundant splits. Instead we only want to return the incremental
- * changes (if so any) in that batch of input paths.
- *
- * NOTE on Hive queries that are executed using Fetch task:
- * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
- * listed in every such listStatus() call. In order to avoid this, it might be useful to
- * disable fetch tasks using the hive session property for incremental queries:
- * `set hive.fetch.task.conversion=none;`
- * This would ensure Map Reduce execution is chosen for a Hive query, which combines
- * partitions (comma separated) and calls InputFormat.listStatus() only once with all
- * those partitions.
- */
- for (Path path : inputPaths) {
- if (path.toString().contains(s)) {
- return true;
- }
- }
- return false;
- })
- .collect(Collectors.joining(","));
- if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
+ Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+ if (!commitsToCheck.isPresent()) {
return null;
}
+ Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
- setInputPaths(job, incrementalInputPaths);
- FileStatus[] fileStatuses = super.listStatus(job);
- BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
- List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
- return returns;
- }
-
- /**
- * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
- * based on given table metadata.
- * @param fileStatuses
- * @param metaClientList
- * @return
- * @throws IOException
- */
- private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
- FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
- // This assumes the paths for different tables are grouped together
- Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
- HoodieTableMetaClient metadata = null;
- for (FileStatus status : fileStatuses) {
- Path inputPath = status.getPath();
- if (!inputPath.getName().endsWith(".parquet")) {
- //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
- // with "."
- continue;
- }
- if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
- for (HoodieTableMetaClient metaClient : metaClientList) {
- if (inputPath.toString().contains(metaClient.getBasePath())) {
- metadata = metaClient;
- if (!grouped.containsKey(metadata)) {
- grouped.put(metadata, new ArrayList<>());
- }
- break;
- }
- }
- }
- grouped.get(metadata).add(status);
- }
- return grouped;
- }
-
- /**
- * Filters data files for a snapshot queried table.
- */
- private List<FileStatus> filterFileStatusForSnapshotMode(
- HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
- FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
- }
- // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
- HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
- // filter files on the latest commit found
- List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
- List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
- }
- filteredFile = checkFileStatus(filteredFile);
- returns.add(filteredFile.getFileStatus());
- }
- return returns;
- }
-
- /**
- * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
- * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
- * 3. Generation of splits looks at FileStatus size to create splits, which skips this file
- */
- private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
- Path dataPath = dataFile.getFileStatus().getPath();
- try {
- if (dataFile.getFileSize() == 0) {
- FileSystem fs = dataPath.getFileSystem(conf);
- LOG.info("Refreshing file status " + dataFile.getPath());
- return new HoodieBaseFile(fs.getFileStatus(dataPath));
- }
- return dataFile;
- } catch (IOException e) {
- throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
}
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = super.listStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
public void setConf(Configuration conf) {
@@ -338,19 +167,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return super.getRecordReader(split, job, reporter);
}
- /**
- * Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a
- * better way is figured out to pass in the hoodie meta directory
- */
- protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
- int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
- if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
- HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
- metadata.readFromFS();
- levels = metadata.getPartitionDepth();
- }
- Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
- LOG.info("Reading hoodie metadata from path " + baseDir.toString());
- return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
- }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 4058875..d27d6ad 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -140,7 +141,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder);
metadata.readFromFS();
- baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth());
+ baseDir = HoodieHiveUtils.getNthParent(folder, metadata.getPartitionDepth());
} else {
baseDir = safeGetParentsParent(folder);
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index e4a57aa..1ad3812 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath;
/**
* InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
@@ -97,7 +97,7 @@ public class InputPathHandler {
// This path is for a table that we dont know about yet.
HoodieTableMetaClient metaClient;
try {
- metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath);
+ metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
String tableName = metaClient.getTableConfig().getTableName();
tableMetaClientMap.put(tableName, metaClient);
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
new file mode 100644
index 0000000..95945f3
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+/**
+ * Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader.
+ */
+public final class HoodieRealtimeConfig {
+
+ // Fraction of mapper/reducer task memory used for compaction of log files
+ public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
+ public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
+ // used to choose a trade off between IO vs Memory when performing compaction process
+ // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
+ // size + small memory
+ public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
+ public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
+
+ // Property to set the max memory for dfs inputstream buffer size
+ public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
+ // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
+ public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
+ // Property to set file path prefix for spillable file
+ public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
+ // Default file path prefix for spillable file
+ public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 5f675bb..b7da749 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -18,74 +18,34 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
*/
public abstract class AbstractRealtimeRecordReader {
-
- // Fraction of mapper/reducer task memory used for compaction of log files
- public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
- public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
- // used to choose a trade off between IO vs Memory when performing compaction process
- // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
- // size + small memory
- public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
- public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
-
- // Property to set the max memory for dfs inputstream buffer size
- public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
- // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
- public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
- // Property to set file path prefix for spillable file
- public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
- // Default file path prefix for spillable file
- public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
-
private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
protected final HoodieRealtimeFileSplit split;
@@ -106,7 +66,7 @@ public abstract class AbstractRealtimeRecordReader {
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
- baseFileSchema = readSchema(jobConf, split.getPath());
+ baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
init();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
@@ -120,220 +80,6 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
- * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
- * support hive 1.1.0
- */
- private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
- try {
- return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
- }
- }
-
- /**
- * Prints a JSON representation of the ArrayWritable for easier debuggability.
- */
- protected static String arrayWritableToString(ArrayWritable writable) {
- if (writable == null) {
- return "null";
- }
- StringBuilder builder = new StringBuilder();
- Writable[] values = writable.get();
- builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
- int i = 0;
- for (Writable w : values) {
- if (w instanceof ArrayWritable) {
- builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
- } else {
- builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
- if (w == null) {
- builder.append("\"type" + i + "\":\"unknown\"").append(",");
- } else {
- builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
- }
- }
- i++;
- }
- builder.deleteCharAt(builder.length() - 1);
- builder.append("}");
- return builder.toString();
- }
-
- /**
- * Given a comma separated list of field names and positions at which they appear on Hive, return
- * an ordered list of field names, that can be passed onto storage.
- */
- private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
- // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
- // handles duplicate fields orders correctly.
- // Fields Orders -> {@link https://github
- // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
- // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
- // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
- // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
- String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
- Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
- String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
- List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
- .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
- Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
- // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
- if (fieldNamesSet.size() != fieldOrders.length) {
- throw new HoodieException(String
- .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
- fieldNames.size(), fieldOrders.length));
- }
- TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
- String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
- for (int ox = 0; ox < fieldOrders.length; ox++) {
- orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
- }
- return new ArrayList<>(orderedFieldMap.values());
- }
-
- /**
- * Generate a reader schema off the provided writeSchema, to just project out the provided columns.
- */
- public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
- List<String> fieldNames) {
- /**
- * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
- * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
- * spark.sql.caseSensitive=true
- *
- * For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here
- * the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to
- * lower-cases
- *
- */
- List<Schema.Field> projectedFields = new ArrayList<>();
- for (String fn : fieldNames) {
- Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
- if (field == null) {
- throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
- + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
- } else {
- projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
- }
- }
-
- Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
- writeSchema.getNamespace(), writeSchema.isError());
- projectedSchema.setFields(projectedFields);
- return projectedSchema;
- }
-
- public static Map<String, Field> getNameToFieldMap(Schema schema) {
- return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
- .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
- }
-
- /**
- * Convert the projected read from delta record into an array writable.
- */
- public static Writable avroToArrayWritable(Object value, Schema schema) {
-
- if (value == null) {
- return null;
- }
-
- switch (schema.getType()) {
- case STRING:
- return new Text(value.toString());
- case BYTES:
- return new BytesWritable((byte[]) value);
- case INT:
- return new IntWritable((Integer) value);
- case LONG:
- return new LongWritable((Long) value);
- case FLOAT:
- return new FloatWritable((Float) value);
- case DOUBLE:
- return new DoubleWritable((Double) value);
- case BOOLEAN:
- return new BooleanWritable((Boolean) value);
- case NULL:
- return null;
- case RECORD:
- GenericRecord record = (GenericRecord) value;
- Writable[] recordValues = new Writable[schema.getFields().size()];
- int recordValueIndex = 0;
- for (Schema.Field field : schema.getFields()) {
- recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
- }
- return new ArrayWritable(Writable.class, recordValues);
- case ENUM:
- return new Text(value.toString());
- case ARRAY:
- GenericArray arrayValue = (GenericArray) value;
- Writable[] arrayValues = new Writable[arrayValue.size()];
- int arrayValueIndex = 0;
- for (Object obj : arrayValue) {
- arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType());
- }
- // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable
- return new ArrayWritable(Writable.class, arrayValues);
- case MAP:
- Map mapValue = (Map) value;
- Writable[] mapValues = new Writable[mapValue.size()];
- int mapValueIndex = 0;
- for (Object entry : mapValue.entrySet()) {
- Map.Entry mapEntry = (Map.Entry) entry;
- Writable[] nestedMapValues = new Writable[2];
- nestedMapValues[0] = new Text(mapEntry.getKey().toString());
- nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
- mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
- }
- // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable
- return new ArrayWritable(Writable.class, mapValues);
- case UNION:
- List<Schema> types = schema.getTypes();
- if (types.size() != 2) {
- throw new IllegalArgumentException("Only support union with 2 fields");
- }
- Schema s1 = types.get(0);
- Schema s2 = types.get(1);
- if (s1.getType() == Schema.Type.NULL) {
- return avroToArrayWritable(value, s2);
- } else if (s2.getType() == Schema.Type.NULL) {
- return avroToArrayWritable(value, s1);
- } else {
- throw new IllegalArgumentException("Only support union with null");
- }
- case FIXED:
- if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
- LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
- HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
- decimal.getScale());
- return HiveDecimalWritable.enforcePrecisionScale(writable,
- decimal.getPrecision(),
- decimal.getScale());
- }
- return new BytesWritable(((GenericFixed) value).bytes());
- default:
- return null;
- }
- }
-
- /**
- * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
- * also be part of the projected schema. Hive expects the record reader implementation to return the row in its
- * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
- * also includes partition columns
- *
- * @param schema Schema to be changed
- */
- private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
- final Set<String> firstLevelFieldNames =
- schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet());
- List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
- .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
-
- return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
- }
-
- /**
* Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
* job conf.
@@ -353,16 +99,16 @@ public abstract class AbstractRealtimeRecordReader {
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
- writerSchema = addPartitionFields(writerSchema, partitioningFields);
- List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+ writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
+ List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
- Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
+ Map<String, Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
- readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
+ readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaLogPaths(), split.getPath(), projectionFields));
}
@@ -409,7 +155,8 @@ public abstract class AbstractRealtimeRecordReader {
public long getMaxCompactionMemoryInBytes() {
// jobConf.getMemoryForMapTask() returns in MB
return (long) Math
- .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
+ .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
+ HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
* jobConf.getMemoryForMapTask() * 1024 * 1024L);
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
index bdf11ed..7a0bd37 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -65,7 +66,7 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWrita
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
if (this.currentRecordReader.next(key, value)) {
LOG.info("Reading from record reader");
- LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value));
+ LOG.info(HoodieRealtimeRecordReaderUtils.arrayWritableToString(value));
return true;
} else if (recordReaders.size() > 0) {
this.currentRecordReader.close();
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index ae3fb5c..1124791 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -18,27 +18,17 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
@@ -52,13 +42,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -70,11 +54,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
- // These positions have to be deterministic across all tables
- public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
- public static final int HOODIE_RECORD_KEY_COL_POS = 2;
- public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
- public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
// To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
@@ -85,74 +64,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
- // obtain all unique parent folders for splits
- Map<Path, List<FileSplit>> partitionsToParquetSplits =
- fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
- // TODO(vc): Should we handle also non-hoodie splits here?
- Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
- Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
- partitionsToParquetSplits.keySet().stream().collect(Collectors.toMap(Function.identity(), p -> {
- // find if we have a metaclient already for this partition.
- Option<String> matchingBasePath = Option.fromJavaOptional(
- metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
- if (matchingBasePath.isPresent()) {
- return metaClientMap.get(matchingBasePath.get());
- }
-
- try {
- HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
- metaClientMap.put(metaClient.getBasePath(), metaClient);
- return metaClient;
- } catch (IOException e) {
- throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
- }
- }));
-
- // for all unique split parents, obtain all delta files based on delta commit timeline,
- // grouped on file id
- List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
- partitionsToParquetSplits.keySet().forEach(partitionPath -> {
- // for each partition path obtain the data & log file groupings, then map back to inputsplits
- HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
- HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
- String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
-
- try {
- // Both commit and delta-commits are included - pick the latest completed one
- Option<HoodieInstant> latestCompletedInstant =
- metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
-
- Stream<FileSlice> latestFileSlices = latestCompletedInstant
- .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
- .orElse(Stream.empty());
-
- // subgroup splits again by file id & match with log files.
- Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
- .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
- latestFileSlices.forEach(fileSlice -> {
- List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
- dataFileSplits.forEach(split -> {
- try {
- List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
- // Get the maxCommit from the last delta or compaction or commit - when
- // bootstrapped from COW table
- String maxCommitTime = metaClient
- .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
- HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
- .filterCompletedInstants().lastInstant().get().getTimestamp();
- rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
- } catch (IOException e) {
- throw new HoodieIOException("Error creating hoodie real time split ", e);
- }
- });
- });
- } catch (Exception e) {
- throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
- }
- });
- LOG.info("Returning a total splits of " + rtSplits.size());
- return rtSplits.toArray(new InputSplit[0]);
+ return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
@Override
@@ -199,9 +111,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
private static void addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
- addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
- addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
- addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
+ addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
+ addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
+ addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
}
/**
@@ -228,12 +140,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
- if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
+ if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
+ if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
@@ -245,7 +157,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
addRequiredProjectionFields(jobConf);
this.conf = jobConf;
- this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
+ this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
}
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 73e0b50..02bb5eb 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
@@ -60,13 +63,17 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API)
- return new HoodieMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(),
- split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(),
+ return new HoodieMergedLogRecordScanner(
+ FSUtils.getFs(split.getPath().toString(), jobConf),
+ split.getBasePath(),
+ split.getDeltaLogPaths(),
+ usesCustomPayload ? getWriterSchema() : getReaderSchema(),
+ split.getMaxCommitTime(),
getMaxCompactionMemoryInBytes(),
- Boolean
- .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
- false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
- jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH));
+ Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+ false,
+ jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
+ jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH));
}
@Override
@@ -81,7 +88,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
// would be true until we have a way to index logs too)
// return from delta records map if we have some match.
- String key = arrayWritable.get()[HoodieParquetRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString();
+ String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns from the parquet
@@ -105,11 +112,11 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
// schema, we use writerSchema to create the arrayWritable from the latest generic record
- ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
+ ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
- arrayWritableToString(aWritable)));
+ LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
+ HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
}
Writable[] originalValue = arrayWritable.get();
try {
@@ -117,10 +124,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
arrayWritable.set(originalValue);
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
- LOG.error("Base record :" + arrayWritableToString(arrayWritable));
- LOG.error("Log record :" + arrayWritableToString(aWritable));
- String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
- + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
+ LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
+ LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
+ String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
+ + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
throw new RuntimeException(errMsg, re);
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 4c773d4..8bc1cfb 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -37,6 +37,8 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
@@ -76,11 +78,11 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
this.iterator = this.executor.getQueue().iterator();
this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
- Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
- false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+ Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+ false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
- ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getHiveSchema());
+ ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
this.executor.getQueue().insertRecord(aWritable);
});
// Start reading and buffering
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
similarity index 98%
rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
index 0537cfa..02fb9d0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.hadoop;
+package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
@@ -31,9 +31,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class HoodieHiveUtil {
+public class HoodieHiveUtils {
- public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
+ public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class);
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
similarity index 58%
copy from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
copy to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 5a10f3c..d10b664 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -16,21 +16,8 @@
* limitations under the License.
*/
-package org.apache.hudi.hadoop;
+package org.apache.hudi.hadoop.utils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -39,85 +26,40 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
-/**
- * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
- * that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus()
- * would do). The JobConf could have paths from multipe Hoodie/Non-Hoodie tables
- */
-@UseFileSplitsFromInputFormat
-public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
-
- private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
+public class HoodieInputFormatUtils {
- protected Configuration conf;
-
- @Override
- public FileStatus[] listStatus(JobConf job) throws IOException {
- // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
- List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
- InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
- List<FileStatus> returns = new ArrayList<>();
+ // These positions have to be deterministic across all tables
+ public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
+ public static final int HOODIE_RECORD_KEY_COL_POS = 2;
+ public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
+ public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
- Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
- // process incremental pulls first
- for (String table : incrementalTables) {
- HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
- if (metaClient == null) {
- /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
- * in the jobConf
- */
- continue;
- }
- List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
- List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
- if (result != null) {
- returns.addAll(result);
- }
- }
-
- // process non hoodie Paths next.
- List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
- if (nonHoodiePaths.size() > 0) {
- setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- returns.addAll(Arrays.asList(fileStatuses));
- }
-
- // process snapshot queries next.
- List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
- if (snapshotPaths.size() > 0) {
- setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
- for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
- List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
- if (result != null) {
- returns.addAll(result);
- }
- }
- }
- return returns.toArray(new FileStatus[returns.size()]);
- }
+ private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
/**
* Filter any specific instants that we do not want to process.
@@ -132,16 +74,20 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
*
* To workaround this problem, we want to stop returning data belonging to commits > t2.
* After compaction is complete, incremental reader would see updates in t2, t3, so on.
+ * @param timeline
+ * @return
*/
- protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+ public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
- Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+ .filterPendingCompactionTimeline().firstInstant();
if (pendingCompactionInstant.isPresent()) {
- HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+ HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+ .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
- instantsTimeline.getCommitsTimeline().countInstants();
LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
- + " skipping " + numCommitsFilteredByCompaction + " commits");
+ + " skipping " + numCommitsFilteredByCompaction + " commits");
return instantsTimeline;
} else {
@@ -150,29 +96,18 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
}
/**
- * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
- * partitions and then filtering based on the commits of interest, this logic first extracts the
- * partitions touched by the desired commits and then lists only those partitions.
+ * Extract partitions touched by the commitsToCheck.
+ * @param commitsToCheck
+ * @param tableMetaClient
+ * @param timeline
+ * @param inputPaths
+ * @return
+ * @throws IOException
*/
- private List<FileStatus> listStatusForIncrementalMode(
- JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
- String tableName = tableMetaClient.getTableConfig().getTableName();
- Job jobContext = Job.getInstance(job);
- HoodieDefaultTimeline baseTimeline;
- if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
- baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
- } else {
- baseTimeline = tableMetaClient.getActiveTimeline();
- }
-
- HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
- String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
- // Total number of commits to return in this batch. Set this to -1 to get all the commits.
- Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
- List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
- .getInstants().collect(Collectors.toList());
- // Extract partitions touched by the commitsToCheck
+ public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+ HoodieTableMetaClient tableMetaClient,
+ HoodieTimeline timeline,
+ List<Path> inputPaths) throws IOException {
Set<String> partitionsToList = new HashSet<>();
for (HoodieInstant commit : commitsToCheck) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
@@ -180,7 +115,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
}
if (partitionsToList.isEmpty()) {
- return null;
+ return Option.empty();
}
String incrementalInputPaths = partitionsToList.stream()
.map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
@@ -211,19 +146,105 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return false;
})
.collect(Collectors.joining(","));
- if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
- return null;
+ return Option.of(incrementalInputPaths);
+ }
+
+ /**
+ * Extract HoodieTimeline based on HoodieTableMetaClient.
+ * @param job
+ * @param tableMetaClient
+ * @return
+ */
+ public static Option<HoodieTimeline> getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ HoodieDefaultTimeline baseTimeline;
+ if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+ baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+ } else {
+ baseTimeline = tableMetaClient.getActiveTimeline();
+ }
+ return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+ }
+
+ /**
+ * Get commits for incremental query from Hive map reduce configuration.
+ * @param job
+ * @param tableName
+ * @param timeline
+ * @return
+ */
+ public static Option<List<HoodieInstant>> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
+ String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+ Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+ LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList()));
+ }
+
+ /**
+ * Extract HoodieTableMetaClient by base path.
+ * @param conf
+ * @param partitions
+ * @return
+ */
+ public static Map<Path, HoodieTableMetaClient> getTableMetaClientByBasePath(Configuration conf, Set<Path> partitions) {
+ Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+ return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+ // Get meta client if this path is the base path.
+ Option<String> matchingBasePath = Option.fromJavaOptional(
+ metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
+ if (matchingBasePath.isPresent()) {
+ return metaClientMap.get(matchingBasePath.get());
+ }
+
+ try {
+ HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p);
+ metaClientMap.put(metaClient.getBasePath(), metaClient);
+ return metaClient;
+ } catch (IOException e) {
+ throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
+ }
+ }));
+ }
+
+ /**
+ * Extract HoodieTableMetaClient from a partition path(not base path).
+ * @param fs
+ * @param dataPath
+ * @return
+ * @throws IOException
+ */
+ public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
+ int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
+ if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
+ HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
+ metadata.readFromFS();
+ levels = metadata.getPartitionDepth();
}
- // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
- setInputPaths(job, incrementalInputPaths);
- FileStatus[] fileStatuses = super.listStatus(job);
- BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
+ Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
+ LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+ return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+ }
+
+ /**
+ * Filter a list of FileStatus based on commitsToCheck for incremental view.
+ * @param job
+ * @param tableMetaClient
+ * @param timeline
+ * @param fileStatuses
+ * @param commitsToCheck
+ * @return
+ */
+ public static List<FileStatus> filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient,
+ HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) {
+ TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
List<FileStatus> returns = new ArrayList<>();
for (HoodieBaseFile filteredFile : filteredFiles) {
LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
- filteredFile = checkFileStatus(filteredFile);
+ filteredFile = refreshFileStatus(job.getConfiguration(), filteredFile);
returns.add(filteredFile.getFileStatus());
}
LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
@@ -238,7 +259,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
* @return
* @throws IOException
*/
- private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
+ public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
@@ -268,16 +289,20 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
/**
* Filters data files for a snapshot queried table.
+ * @param job
+ * @param metadata
+ * @param fileStatuses
+ * @return
*/
- private List<FileStatus> filterFileStatusForSnapshotMode(
- HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
+ public static List<FileStatus> filterFileStatusForSnapshotMode(
+ JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
}
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
+ TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
// filter files on the latest commit found
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
@@ -286,7 +311,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
- filteredFile = checkFileStatus(filteredFile);
+ filteredFile = refreshFileStatus(job, filteredFile);
returns.add(filteredFile.getFileStatus());
}
return returns;
@@ -296,8 +321,11 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
* Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
* super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
* 3. Generation of splits looks at FileStatus size to create splits, which skips this file
+ * @param conf
+ * @param dataFile
+ * @return
*/
- private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
+ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
Path dataPath = dataFile.getFileStatus().getPath();
try {
if (dataFile.getFileSize() == 0) {
@@ -311,46 +339,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
}
}
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
- final Reporter reporter) throws IOException {
- // TODO enable automatic predicate pushdown after fixing issues
- // FileSplit fileSplit = (FileSplit) split;
- // HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
- // String tableName = metadata.getTableName();
- // String mode = HoodieHiveUtil.readMode(job, tableName);
-
- // if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
- // FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
- // LOG.info("Setting parquet predicate push down as " + predicate);
- // ParquetInputFormat.setFilterPredicate(job, predicate);
- // clearOutExistingPredicate(job);
- // }
- return super.getRecordReader(split, job, reporter);
- }
-
- /**
- * Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a
- * better way is figured out to pass in the hoodie meta directory
- */
- protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
- int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
- if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
- HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
- metadata.readFromFS();
- levels = metadata.getPartitionDepth();
- }
- Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
- LOG.info("Reading hoodie metadata from path " + baseDir.toString());
- return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
- }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
new file mode 100644
index 0000000..7ae4ea0
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
+
+ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
+ Map<Path, List<FileSplit>> partitionsToParquetSplits =
+ fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
+ // TODO(vc): Should we handle also non-hoodie splits here?
+ Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
+
+ // for all unique split parents, obtain all delta files based on delta commit timeline,
+ // grouped on file id
+ List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
+ partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+ // for each partition path obtain the data & log file groupings, then map back to inputsplits
+ HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+ String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
+
+ try {
+ // Both commit and delta-commits are included - pick the latest completed one
+ Option<HoodieInstant> latestCompletedInstant =
+ metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ Stream<FileSlice> latestFileSlices = latestCompletedInstant
+ .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
+ .orElse(Stream.empty());
+
+ // subgroup splits again by file id & match with log files.
+ Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+ .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
+ latestFileSlices.forEach(fileSlice -> {
+ List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+ dataFileSplits.forEach(split -> {
+ try {
+ List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+ // Get the maxCommit from the last delta or compaction or commit - when
+ // bootstrapped from COW table
+ String maxCommitTime = metaClient
+ .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+ rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
+ } catch (IOException e) {
+ throw new HoodieIOException("Error creating hoodie real time split ", e);
+ }
+ });
+ });
+ } catch (Exception e) {
+ throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
+ }
+ });
+ LOG.info("Returning a total splits of " + rtSplits.size());
+ return rtSplits.toArray(new InputSplit[0]);
+ }
+
+ // Return parquet file with a list of log files in the same file group.
+ public static Map<String, List<String>> groupLogsByBaseFile(Configuration conf, Stream<FileStatus> fileStatuses) {
+ Map<Path, List<FileStatus>> partitionsToParquetSplits =
+ fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent()));
+ // TODO(vc): Should we handle also non-hoodie splits here?
+ Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
+
+ // for all unique split parents, obtain all delta files based on delta commit timeline,
+ // grouped on file id
+ Map<String, List<String>> resultMap = new HashMap<>();
+ partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+ // for each partition path obtain the data & log file groupings, then map back to inputsplits
+ HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+ String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
+
+ try {
+ // Both commit and delta-commits are included - pick the latest completed one
+ Option<HoodieInstant> latestCompletedInstant =
+ metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ Stream<FileSlice> latestFileSlices = latestCompletedInstant
+ .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
+ .orElse(Stream.empty());
+
+ // subgroup splits again by file id & match with log files.
+ Map<String, List<FileStatus>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+ .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName())));
+ latestFileSlices.forEach(fileSlice -> {
+ List<FileStatus> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+ dataFileSplits.forEach(split -> {
+ try {
+ List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+ resultMap.put(split.getPath().toString(), logFilePaths);
+ } catch (Exception e) {
+ throw new HoodieException("Error creating hoodie real time split ", e);
+ }
+ });
+ });
+ } catch (Exception e) {
+ throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
+ }
+ });
+ return resultMap;
+ }
+
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
similarity index 58%
copy from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
copy to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 5f675bb..6af3770 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -16,26 +16,20 @@
* limitations under the License.
*/
-package org.apache.hudi.hadoop.realtime;
+package org.apache.hudi.hadoop.utils;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.ArrayWritable;
@@ -46,10 +40,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
@@ -63,67 +53,13 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
-/**
- * Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
- */
-public abstract class AbstractRealtimeRecordReader {
-
- // Fraction of mapper/reducer task memory used for compaction of log files
- public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
- public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
- // used to choose a trade off between IO vs Memory when performing compaction process
- // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
- // size + small memory
- public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
- public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
-
- // Property to set the max memory for dfs inputstream buffer size
- public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
- // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
- public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
- // Property to set file path prefix for spillable file
- public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
- // Default file path prefix for spillable file
- public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
-
- private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
-
- protected final HoodieRealtimeFileSplit split;
- protected final JobConf jobConf;
- private final MessageType baseFileSchema;
- protected final boolean usesCustomPayload;
- // Schema handles
- private Schema readerSchema;
- private Schema writerSchema;
- private Schema hiveSchema;
-
- public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
- this.split = split;
- this.jobConf = job;
- LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
- LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
- try {
- this.usesCustomPayload = usesCustomPayload();
- LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
- baseFileSchema = readSchema(jobConf, split.getPath());
- init();
- } catch (IOException e) {
- throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
- }
- }
-
- private boolean usesCustomPayload() {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath());
- return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
- || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
- }
+public class HoodieRealtimeRecordReaderUtils {
/**
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
* support hive 1.1.0
*/
- private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
+ public static MessageType readSchema(Configuration conf, Path parquetFilePath) {
try {
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
} catch (IOException e) {
@@ -134,7 +70,7 @@ public abstract class AbstractRealtimeRecordReader {
/**
* Prints a JSON representation of the ArrayWritable for easier debuggability.
*/
- protected static String arrayWritableToString(ArrayWritable writable) {
+ public static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
@@ -161,42 +97,10 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
- * Given a comma separated list of field names and positions at which they appear on Hive, return
- * an ordered list of field names, that can be passed onto storage.
- */
- private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
- // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
- // handles duplicate fields orders correctly.
- // Fields Orders -> {@link https://github
- // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
- // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
- // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
- // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
- String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
- Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
- String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
- List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
- .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
- Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
- // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
- if (fieldNamesSet.size() != fieldOrders.length) {
- throw new HoodieException(String
- .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
- fieldNames.size(), fieldOrders.length));
- }
- TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
- String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
- for (int ox = 0; ox < fieldOrders.length; ox++) {
- orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
- }
- return new ArrayList<>(orderedFieldMap.values());
- }
-
- /**
* Generate a reader schema off the provided writeSchema, to just project out the provided columns.
*/
- public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
- List<String> fieldNames) {
+ public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
+ List<String> fieldNames) {
/**
* Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
* Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
@@ -224,7 +128,7 @@ public abstract class AbstractRealtimeRecordReader {
return projectedSchema;
}
- public static Map<String, Field> getNameToFieldMap(Schema schema) {
+ public static Map<String, Schema.Field> getNameToFieldMap(Schema schema) {
return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
@@ -305,10 +209,10 @@ public abstract class AbstractRealtimeRecordReader {
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
- decimal.getScale());
+ decimal.getScale());
return HiveDecimalWritable.enforcePrecisionScale(writable,
- decimal.getPrecision(),
- decimal.getScale());
+ decimal.getPrecision(),
+ decimal.getScale());
}
return new BytesWritable(((GenericFixed) value).bytes());
default:
@@ -317,6 +221,38 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
+ * Given a comma separated list of field names and positions at which they appear on Hive, return
+ * an ordered list of field names, that can be passed onto storage.
+ */
+ public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
+ // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
+ // handles duplicate fields orders correctly.
+ // Fields Orders -> {@link https://github
+ // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+ // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
+ // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+ // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
+ String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
+ Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
+ String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
+ List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
+ .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
+ Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
+ // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
+ if (fieldNamesSet.size() != fieldOrders.length) {
+ throw new HoodieException(String
+ .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
+ fieldNames.size(), fieldOrders.length));
+ }
+ TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
+ String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
+ for (int ox = 0; ox < fieldOrders.length; ox++) {
+ orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
+ }
+ return new ArrayList<>(orderedFieldMap.values());
+ }
+
+ /**
* Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
* also be part of the projected schema. Hive expects the record reader implementation to return the row in its
* entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
@@ -324,92 +260,12 @@ public abstract class AbstractRealtimeRecordReader {
*
* @param schema Schema to be changed
*/
- private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
+ public static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
final Set<String> firstLevelFieldNames =
- schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet());
+ schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toSet());
List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
.filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
}
-
- /**
- * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
- * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
- * job conf.
- */
- private void init() throws IOException {
- Schema schemaFromLogFile =
- LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
- if (schemaFromLogFile == null) {
- writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
- LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
- } else {
- writerSchema = schemaFromLogFile;
- LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
- }
- // Add partitioning fields to writer schema for resulting row to contain null values for these fields
- String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
- List<String> partitioningFields =
- partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
- : new ArrayList<>();
- writerSchema = addPartitionFields(writerSchema, partitioningFields);
- List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
-
- Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
- hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
- // TODO(vc): In the future, the reader schema should be updated based on log files & be able
- // to null out fields not present before
-
- readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
- LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
- split.getDeltaLogPaths(), split.getPath(), projectionFields));
- }
-
- private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
- // Get all column names of hive table
- String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
- LOG.info("Hive Columns : " + hiveColumnString);
- String[] hiveColumns = hiveColumnString.split(",");
- LOG.info("Hive Columns : " + hiveColumnString);
- List<Field> hiveSchemaFields = new ArrayList<>();
-
- for (String columnName : hiveColumns) {
- Field field = schemaFieldsMap.get(columnName.toLowerCase());
-
- if (field != null) {
- hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
- } else {
- // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
- // They will get skipped as they won't be found in the original schema.
- LOG.debug("Skipping Hive Column => " + columnName);
- }
- }
-
- Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
- writerSchema.isError());
- hiveSchema.setFields(hiveSchemaFields);
- LOG.info("HIVE Schema is :" + hiveSchema.toString(true));
- return hiveSchema;
- }
-
- public Schema getReaderSchema() {
- return readerSchema;
- }
-
- public Schema getWriterSchema() {
- return writerSchema;
- }
-
- public Schema getHiveSchema() {
- return hiveSchema;
- }
-
- public long getMaxCompactionMemoryInBytes() {
- // jobConf.getMemoryForMapTask() returns in MB
- return (long) Math
- .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
- * jobConf.getMemoryForMapTask() * 1024 * 1024L);
- }
}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index ad38d33..aa9d828 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
@@ -103,7 +104,7 @@ public class TestHoodieParquetInputFormat {
timeline.setInstants(instants);
// Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
- HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+ HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
@@ -116,7 +117,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
- filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+ filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
@@ -130,7 +131,7 @@ public class TestHoodieParquetInputFormat {
instants.remove(t5);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
- filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+ filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
@@ -267,7 +268,7 @@ public class TestHoodieParquetInputFormat {
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
- InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
+ InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL);
files = inputFormat.listStatus(jobConf);
assertEquals(5, files.length,
@@ -312,15 +313,15 @@ public class TestHoodieParquetInputFormat {
public void testGetIncrementalTableNames() throws IOException {
String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
JobConf conf = new JobConf();
- String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
- conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
- String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
- conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
- String incrementalMode3 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
- conf.set(incrementalMode3, HoodieHiveUtil.INCREMENTAL_SCAN_MODE.toLowerCase());
- String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
- conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE);
- List<String> actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf));
+ String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+ conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+ String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+ conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+ String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
+ conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
+ String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+ conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
+ List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
for (String expectedincrTable : expectedincrTables) {
assertTrue(actualincrTables.contains(expectedincrTable));
}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 3e63fef..5a735ce 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
@@ -80,7 +81,7 @@ public class TestHoodieRealtimeRecordReader {
@BeforeEach
public void setUp() {
jobConf = new JobConf();
- jobConf.set(AbstractRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
+ jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath.toString(), hadoopConf);
}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 0c6ed7b..05669bb 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -104,15 +104,15 @@ public class InputFormatTestUtil {
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
String modePropertyName =
- String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
- jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+ String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
- String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
- String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
}