You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/04/13 06:11:06 UTC
[incubator-hudi] branch master updated: [HUDI-770] Organize
upsert/insert API implementation under a single package (#1495)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 17bf930 [HUDI-770] Organize upsert/insert API implementation under a single package (#1495)
17bf930 is described below
commit 17bf9303423821483f17785f8536b7bfed45e42e
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun Apr 12 23:11:00 2020 -0700
[HUDI-770] Organize upsert/insert API implementation under a single package (#1495)
---
.../hudi/client/AbstractHoodieWriteClient.java | 20 +-
.../org/apache/hudi/client/HoodieWriteClient.java | 319 ++++---------------
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 349 +++------------------
.../apache/hudi/table/HoodieMergeOnReadTable.java | 180 +++--------
.../java/org/apache/hudi/table/HoodieTable.java | 92 ++++--
.../hudi/table/action/BaseActionExecutor.java | 5 +-
.../action/commit/BaseCommitActionExecutor.java | 291 +++++++++++++++++
.../BucketInfo.java} | 40 ++-
.../BucketType.java} | 26 +-
.../commit/BulkInsertCommitActionExecutor.java | 60 ++++
.../hudi/table/action/commit/BulkInsertHelper.java | 84 +++++
.../BulkInsertPreppedCommitActionExecutor.java | 61 ++++
.../table/action/commit/CommitActionExecutor.java | 176 +++++++++++
.../DeleteCommitActionExecutor.java} | 32 +-
.../hudi/table/action/commit/DeleteHelper.java | 96 ++++++
.../table/action/commit/HoodieWriteMetadata.java | 104 ++++++
.../InsertBucket.java} | 42 ++-
.../InsertCommitActionExecutor.java} | 34 +-
.../InsertPreppedCommitActionExecutor.java} | 32 +-
.../SmallFile.java} | 41 ++-
.../UpsertCommitActionExecutor.java} | 34 +-
.../table/action/commit/UpsertPartitioner.java | 316 +++++++++++++++++++
.../UpsertPreppedCommitActionExecutor.java} | 32 +-
.../hudi/table/action/commit/WriteHelper.java | 105 +++++++
.../BulkInsertDeltaCommitActionExecutor.java | 62 ++++
...BulkInsertPreppedDeltaCommitActionExecutor.java | 63 ++++
.../DeleteDeltaCommitActionExecutor.java} | 34 +-
.../deltacommit/DeltaCommitActionExecutor.java | 94 ++++++
.../InsertDeltaCommitActionExecutor.java | 49 +++
.../InsertPreppedDeltaCommitActionExecutor.java} | 32 +-
.../UpsertDeltaCommitActionExecutor.java | 49 +++
.../deltacommit/UpsertDeltaCommitPartitioner.java | 142 +++++++++
.../UpsertPreppedDeltaCommitActionExecutor.java} | 32 +-
.../apache/hudi/client/TestHoodieClientBase.java | 3 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 48 +--
.../apache/hudi/table/TestMergeOnReadTable.java | 8 +-
.../commit/TestCopyOnWriteActionExecutor.java} | 123 ++------
.../table/action/commit/TestUpsertPartitioner.java | 148 +++++++++
.../hudi/common/model/WriteOperationType.java | 4 +
39 files changed, 2395 insertions(+), 1067 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index ea5ed9f..862a60f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -166,6 +166,18 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(metadata, instantTime, extraMetadata);
+ emitCommitMetrics(instantTime, metadata, actionType);
+
+ LOG.info("Committed " + instantTime);
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
+ e);
+ }
+ return true;
+ }
+
+ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
+ try {
if (writeContext != null) {
long durationInMs = metrics.getDurationInMs(writeContext.stop());
@@ -173,15 +185,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
metadata, actionType);
writeContext = null;
}
- LOG.info("Committed " + instantTime);
- } catch (IOException e) {
- throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
- e);
} catch (ParseException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ "Instant time is not of valid format", e);
}
- return true;
}
/**
@@ -189,10 +196,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
- * @throws IOException in case of error
*/
protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
- Option<Map<String, String>> extraMetadata) throws IOException;
+ Option<Map<String, String>> extraMetadata);
/**
* Finalize Write operation.
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index e69eef2..6e866f9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -26,7 +26,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -50,28 +49,22 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.BulkInsertMapFunction;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
-import org.apache.hudi.table.WorkloadProfile;
-import org.apache.hudi.table.WorkloadStat;
import com.codahale.metrics.Timer;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -81,7 +74,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import scala.Tuple2;
@@ -176,22 +168,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
setOperationType(WriteOperationType.UPSERT);
- try {
- // De-dupe/merge if needed
- JavaRDD<HoodieRecord<T>> dedupedRecords =
- combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
-
- Timer.Context indexTimer = metrics.getIndexCtx();
- // perform index loop up to get existing location of records
- JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
- metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
- return upsertRecordsInternal(taggedRecords, instantTime, table, true);
- } catch (Throwable e) {
- if (e instanceof HoodieUpsertException) {
- throw (HoodieUpsertException) e;
- }
- throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
+ HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
+ return postWrite(result, instantTime, table);
}
/**
@@ -206,14 +187,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
setOperationType(WriteOperationType.UPSERT_PREPPED);
- try {
- return upsertRecordsInternal(preppedRecords, instantTime, table, true);
- } catch (Throwable e) {
- if (e instanceof HoodieUpsertException) {
- throw (HoodieUpsertException) e;
- }
- throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + instantTime, e);
- }
+ HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
+ return postWrite(result, instantTime, table);
}
/**
@@ -229,18 +204,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
setOperationType(WriteOperationType.INSERT);
- try {
- // De-dupe/merge if needed
- JavaRDD<HoodieRecord<T>> dedupedRecords =
- combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
-
- return upsertRecordsInternal(dedupedRecords, instantTime, table, false);
- } catch (Throwable e) {
- if (e instanceof HoodieInsertException) {
- throw e;
- }
- throw new HoodieInsertException("Failed to insert for commit time " + instantTime, e);
- }
+ HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
+ return postWrite(result, instantTime, table);
}
/**
@@ -257,14 +222,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
setOperationType(WriteOperationType.INSERT_PREPPED);
- try {
- return upsertRecordsInternal(preppedRecords, instantTime, table, false);
- } catch (Throwable e) {
- if (e instanceof HoodieInsertException) {
- throw e;
- }
- throw new HoodieInsertException("Failed to insert prepared records for commit time " + instantTime, e);
- }
+ HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
+ return postWrite(result, instantTime, table);
}
/**
@@ -301,18 +260,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
setOperationType(WriteOperationType.BULK_INSERT);
- try {
- // De-dupe/merge if needed
- JavaRDD<HoodieRecord<T>> dedupedRecords =
- combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
-
- return bulkInsertInternal(dedupedRecords, instantTime, table, bulkInsertPartitioner);
- } catch (Throwable e) {
- if (e instanceof HoodieInsertException) {
- throw e;
- }
- throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
- }
+ HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
+ return postWrite(result, instantTime, table);
}
/**
@@ -335,14 +284,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
- try {
- return bulkInsertInternal(preppedRecords, instantTime, table, bulkInsertPartitioner);
- } catch (Throwable e) {
- if (e instanceof HoodieInsertException) {
- throw e;
- }
- throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + instantTime, e);
- }
+ HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
+ return postWrite(result, instantTime, table);
}
/**
@@ -356,170 +299,59 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.DELETE);
setOperationType(WriteOperationType.DELETE);
- try {
- // De-dupe/merge if needed
- JavaRDD<HoodieKey> dedupedKeys =
- config.shouldCombineBeforeDelete() ? deduplicateKeys(keys) : keys;
-
- JavaRDD<HoodieRecord<T>> dedupedRecords =
- dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
- Timer.Context indexTimer = metrics.getIndexCtx();
- // perform index loop up to get existing location of records
- JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
- // filter out non existant keys/records
- JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
- if (!taggedValidRecords.isEmpty()) {
- metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
- return upsertRecordsInternal(taggedValidRecords, instantTime, table, true);
- } else {
- // if entire set of keys are non existent
- saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), table, instantTime);
- JavaRDD<WriteStatus> writeStatusRDD = jsc.emptyRDD();
- commitOnAutoCommit(instantTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
- return writeStatusRDD;
- }
- } catch (Throwable e) {
- if (e instanceof HoodieUpsertException) {
- throw (HoodieUpsertException) e;
- }
- throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
- }
- }
-
- private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String instantTime,
- HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
- final JavaRDD<HoodieRecord<T>> repartitionedRecords;
- final int parallelism = config.getBulkInsertShuffleParallelism();
- if (bulkInsertPartitioner.isPresent()) {
- repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
- } else {
- // Now, sort the records and line them up nicely for loading.
- repartitionedRecords = dedupedRecords.sortBy(record -> {
- // Let's use "partitionPath + key" as the sort key. Spark, will ensure
- // the records split evenly across RDD partitions, such that small partitions fit
- // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
- return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
- }, true, parallelism);
- }
-
- // generate new file ID prefixes for each output partition
- final List<String> fileIDPrefixes =
- IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
-
- table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
- table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
-
- JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
- .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
- .flatMap(List::iterator);
-
- return updateIndexAndCommitIfNeeded(writeStatusRDD, table, instantTime);
- }
-
- private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records,
- int parallelism) {
- return condition ? deduplicateRecords(records, parallelism) : records;
+ HoodieWriteMetadata result = table.delete(jsc,instantTime, keys);
+ return postWrite(result, instantTime, table);
}
/**
- * Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
- * rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
- * are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
- * Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
+ * Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
+ * @param result Commit Action Result
+ * @param instantTime Instant Time
+ * @param hoodieTable Hoodie Table
+ * @return Write Status
*/
- private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, HoodieTable<T> table, String instantTime)
- throws HoodieCommitException {
- try {
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
- profile.getPartitionPaths().forEach(path -> {
- WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
- partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setFileId(key);
- // TODO : Write baseCommitTime is possible here ?
- writeStat.setPrevCommit(value.getKey());
- writeStat.setNumUpdateWrites(value.getValue());
- metadata.addWriteStat(path.toString(), writeStat);
- });
- });
- metadata.setOperationType(getOperationType());
-
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- String commitActionType = table.getMetaClient().getCommitActionType();
- HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
- activeTimeline.transitionRequestedToInflight(requested,
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } catch (IOException io) {
- throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
+ private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable<T> hoodieTable) {
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
- }
-
- private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime,
- HoodieTable<T> hoodieTable, final boolean isUpsert) {
-
- // Cache the tagged records, so we don't end up computing both
- // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
- if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
- preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
- } else {
- LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel());
- }
-
- WorkloadProfile profile = null;
- if (hoodieTable.isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(preppedRecords);
- LOG.info("Workload profile :" + profile);
- saveWorkloadProfileMetadataToInflight(profile, hoodieTable, instantTime);
- }
-
- // partition using the insert partitioner
- final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile);
- JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
- JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
- if (isUpsert) {
- return hoodieTable.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
- } else {
- return hoodieTable.handleInsertPartition(instantTime, partition, recordItr, partitioner);
+ if (result.isCommitted()) {
+ // Perform post commit operations.
+ if (result.getFinalizeDuration().isPresent()) {
+ metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+ result.getWriteStats().get().size());
}
- }, true).flatMap(List::iterator);
- return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, instantTime);
- }
+ postCommit(result.getCommitMetadata().get(), instantTime, Option.empty());
- private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
- if (isUpsert) {
- return table.getUpsertPartitioner(profile, jsc);
- } else {
- return table.getInsertPartitioner(profile, jsc);
+ emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
+ hoodieTable.getMetaClient().getCommitActionType());
}
- }
-
- private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
- return dedupedRecords.mapToPair(
- record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
- .partitionBy(partitioner).map(Tuple2::_2);
+ return result.getWriteStatuses();
}
@Override
protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
- Option<Map<String, String>> extraMetadata) throws IOException {
-
- // Do an inline compaction if enabled
- if (config.isInlineCompaction()) {
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
- forceCompact(extraMetadata);
- } else {
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
- }
- // We cannot have unbounded commit files. Archive commits if we have to archive
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
- archiveLog.archiveIfRequired(jsc);
- if (config.isAutoClean()) {
- // Call clean to cleanup if there is anything to cleanup after the commit,
- LOG.info("Auto cleaning is enabled. Running cleaner now");
- clean(instantTime);
- } else {
- LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+ Option<Map<String, String>> extraMetadata) {
+ try {
+ // Do an inline compaction if enabled
+ if (config.isInlineCompaction()) {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
+ forceCompact(extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
+ }
+ // We cannot have unbounded commit files. Archive commits if we have to archive
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
+ archiveLog.archiveIfRequired(jsc);
+ if (config.isAutoClean()) {
+ // Call clean to cleanup if there is anything to cleanup after the commit,
+ LOG.info("Auto cleaning is enabled. Running cleaner now");
+ clean(instantTime);
+ } else {
+ LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
@@ -978,47 +810,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
- * Deduplicate Hoodie records, using the given deduplication function.
- *
- * @param records hoodieRecords to deduplicate
- * @param parallelism parallelism or partitions to be used while reducing/deduplicating
- * @return RDD of HoodieRecord already be deduplicated
- */
- JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
- boolean isIndexingGlobal = getIndex().isGlobal();
- return records.mapToPair(record -> {
- HoodieKey hoodieKey = record.getKey();
- // If index used is global, then records are expected to differ in their partitionPath
- Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
- return new Tuple2<>(key, record);
- }).reduceByKey((rec1, rec2) -> {
- @SuppressWarnings("unchecked")
- T reducedData = (T) rec1.getData().preCombine(rec2.getData());
- // we cannot allow the user to change the key or partitionPath, since that will affect
- // everything
- // so pick it from one of the records.
- return new HoodieRecord<T>(rec1.getKey(), reducedData);
- }, parallelism).map(Tuple2::_2);
- }
-
- /**
- * Deduplicate Hoodie records, using the given deduplication function.
- *
- * @param keys RDD of HoodieKey to deduplicate
- * @return RDD of HoodieKey already be deduplicated
- */
- JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys) {
- boolean isIndexingGlobal = getIndex().isGlobal();
- if (isIndexingGlobal) {
- return keys.keyBy(HoodieKey::getRecordKey)
- .reduceByKey((key1, key2) -> key1)
- .values();
- } else {
- return keys.distinct();
- }
- }
-
- /**
* Cleanup all pending commits.
*/
private void rollbackPendingCommits() {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index f01cdaa..03c014f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -32,26 +32,30 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
+import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
@@ -59,21 +63,16 @@ import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -94,21 +93,44 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
- public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
- if (profile == null) {
- throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
- }
- return new UpsertPartitioner(profile, jsc);
+ public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ return new UpsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ return new InsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ return new BulkInsertCommitActionExecutor<>(jsc, config,
+ this, instantTime, records, bulkInsertPartitioner).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
+ return new DeleteCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords) {
+ return new UpsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
- public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
- return getUpsertPartitioner(profile, jsc);
+ public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords) {
+ return new InsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
- public boolean isWorkloadProfileNeeded() {
- return true;
+ public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ return new BulkInsertPreppedCommitActionExecutor<>(jsc, config,
+ this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
@@ -123,19 +145,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
- Iterator<HoodieRecord<T>> recordItr)
- throws IOException {
- // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
- if (!recordItr.hasNext()) {
- LOG.info("Empty partition with fileId => " + fileId);
- return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
- }
- // these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, recordItr);
- return handleUpdateInternal(upsertHandle, instantTime, fileId);
- }
-
- public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
@@ -173,26 +182,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
}
- protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
- return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
- }
-
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
}
- public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
- throws Exception {
- // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
- if (!recordItr.hasNext()) {
- LOG.info("Empty partition");
- return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
- }
- return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
- }
-
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
HoodieCreateHandle createHandle =
@@ -201,34 +196,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}
- @SuppressWarnings("unchecked")
- @Override
- public Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
- Partitioner partitioner) {
- UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
- BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
- BucketType btype = binfo.bucketType;
- try {
- if (btype.equals(BucketType.INSERT)) {
- return handleInsert(instantTime, binfo.fileIdPrefix, recordItr);
- } else if (btype.equals(BucketType.UPDATE)) {
- return handleUpdate(instantTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr);
- } else {
- throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
- }
- } catch (Throwable t) {
- String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
- LOG.error(msg, t);
- throw new HoodieUpsertException(msg, t);
- }
- }
-
- @Override
- public Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
- Partitioner partitioner) {
- return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
- }
-
@Override
public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) {
return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute();
@@ -390,242 +357,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
/**
- * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
- */
- class UpsertPartitioner extends Partitioner {
-
- /**
- * List of all small files to be corrected.
- */
- List<SmallFile> smallFiles = new ArrayList<>();
- /**
- * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
- */
- private int totalBuckets = 0;
- /**
- * Stat for the current workload. Helps in determining total inserts, upserts etc.
- */
- private WorkloadStat globalStat;
- /**
- * Helps decide which bucket an incoming update should go to.
- */
- private HashMap<String, Integer> updateLocationToBucket;
- /**
- * Helps us pack inserts into 1 or more buckets depending on number of incoming records.
- */
- private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
- /**
- * Remembers what type each bucket is for later.
- */
- private HashMap<Integer, BucketInfo> bucketInfoMap;
-
- /**
- * Rolling stats for files.
- */
- protected HoodieRollingStatMetadata rollingStatMetadata;
-
- UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
- updateLocationToBucket = new HashMap<>();
- partitionPathToInsertBuckets = new HashMap<>();
- bucketInfoMap = new HashMap<>();
- globalStat = profile.getGlobalStat();
- rollingStatMetadata = getRollingStats();
- assignUpdates(profile);
- assignInserts(profile, jsc);
-
- LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
- + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
- + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
- }
-
- private void assignUpdates(WorkloadProfile profile) {
- // each update location gets a partition
- Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
- for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
- for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
- partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
- addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
- }
- }
- }
-
- private int addUpdateBucket(String partitionPath, String fileIdHint) {
- int bucket = totalBuckets;
- updateLocationToBucket.put(fileIdHint, bucket);
- BucketInfo bucketInfo = new BucketInfo();
- bucketInfo.bucketType = BucketType.UPDATE;
- bucketInfo.fileIdPrefix = fileIdHint;
- bucketInfo.partitionPath = partitionPath;
- bucketInfoMap.put(totalBuckets, bucketInfo);
- totalBuckets++;
- return bucket;
- }
-
- private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
- // for new inserts, compute buckets depending on how many records we have for each partition
- Set<String> partitionPaths = profile.getPartitionPaths();
- long averageRecordSize =
- averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
- config.getCopyOnWriteRecordSizeEstimate());
- LOG.info("AvgRecordSize => " + averageRecordSize);
-
- Map<String, List<SmallFile>> partitionSmallFilesMap =
- getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
-
- for (String partitionPath : partitionPaths) {
- WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
- if (pStat.getNumInserts() > 0) {
-
- List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
- this.smallFiles.addAll(smallFiles);
-
- LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
-
- long totalUnassignedInserts = pStat.getNumInserts();
- List<Integer> bucketNumbers = new ArrayList<>();
- List<Long> recordsPerBucket = new ArrayList<>();
-
- // first try packing this into one of the smallFiles
- for (SmallFile smallFile : smallFiles) {
- long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
- totalUnassignedInserts);
- if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
- // create a new bucket or re-use an existing bucket
- int bucket;
- if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
- bucket = updateLocationToBucket.get(smallFile.location.getFileId());
- LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
- } else {
- bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
- LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
- }
- bucketNumbers.add(bucket);
- recordsPerBucket.add(recordsToAppend);
- totalUnassignedInserts -= recordsToAppend;
- }
- }
-
- // if we have anything more, create new insert buckets, like normal
- if (totalUnassignedInserts > 0) {
- long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
- if (config.shouldAutoTuneInsertSplits()) {
- insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
- }
-
- int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
- LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
- + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
- for (int b = 0; b < insertBuckets; b++) {
- bucketNumbers.add(totalBuckets);
- recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
- BucketInfo bucketInfo = new BucketInfo();
- bucketInfo.bucketType = BucketType.INSERT;
- bucketInfo.partitionPath = partitionPath;
- bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
- bucketInfoMap.put(totalBuckets, bucketInfo);
- totalBuckets++;
- }
- }
-
- // Go over all such buckets, and assign weights as per amount of incoming inserts.
- List<InsertBucket> insertBuckets = new ArrayList<>();
- for (int i = 0; i < bucketNumbers.size(); i++) {
- InsertBucket bkt = new InsertBucket();
- bkt.bucketNumber = bucketNumbers.get(i);
- bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
- insertBuckets.add(bkt);
- }
- LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
- partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
- }
- }
- }
-
- private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
-
- Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
- if (partitionPaths != null && partitionPaths.size() > 0) {
- JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
- partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
- partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
- }
-
- return partitionSmallFilesMap;
- }
-
- /**
- * Returns a list of small files in the given partition path.
- */
- protected List<SmallFile> getSmallFiles(String partitionPath) {
-
- // smallFiles only for partitionPath
- List<SmallFile> smallFileLocations = new ArrayList<>();
-
- HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
-
- if (!commitTimeline.empty()) { // if we have some commits
- HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- List<HoodieBaseFile> allFiles = getBaseFileOnlyView()
- .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
-
- for (HoodieBaseFile file : allFiles) {
- if (file.getFileSize() < config.getParquetSmallFileLimit()) {
- String filename = file.getFileName();
- SmallFile sf = new SmallFile();
- sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
- sf.sizeBytes = file.getFileSize();
- smallFileLocations.add(sf);
- }
- }
- }
-
- return smallFileLocations;
- }
-
- public BucketInfo getBucketInfo(int bucketNumber) {
- return bucketInfoMap.get(bucketNumber);
- }
-
- public List<InsertBucket> getInsertBuckets(String partitionPath) {
- return partitionPathToInsertBuckets.get(partitionPath);
- }
-
- @Override
- public int numPartitions() {
- return totalBuckets;
- }
-
- @Override
- public int getPartition(Object key) {
- Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
- (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
- if (keyLocation._2().isPresent()) {
- HoodieRecordLocation location = keyLocation._2().get();
- return updateLocationToBucket.get(location.getFileId());
- } else {
- List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
- // pick the target bucket to use based on the weights.
- double totalWeight = 0.0;
- final long totalInserts = Math.max(1, globalStat.getNumInserts());
- final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
- final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
- for (InsertBucket insertBucket : targetBuckets) {
- totalWeight += insertBucket.weight;
- if (r <= totalWeight) {
- return insertBucket.bucketNumber;
- }
- }
- // return first one, by default
- return targetBuckets.get(0).bucketNumber;
- }
- }
- }
-
- protected HoodieRollingStatMetadata getRollingStats() {
- return null;
- }
-
- /**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index d7783cd..c496df2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -24,9 +24,8 @@ import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -39,24 +38,26 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
-import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -82,49 +83,49 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
- // UpsertPartitioner for MergeOnRead table type
- private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
-
HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
super(config, jsc, metaClient);
}
@Override
- public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
- if (profile == null) {
- throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
- }
- mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc);
- return mergeOnReadUpsertPartitioner;
+ public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ return new UpsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
- public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath,
- String fileId, Iterator<HoodieRecord<T>> recordItr)
- throws IOException {
- LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
-
- if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
- LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
- return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
- } else {
- HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, this,
- partitionPath, fileId, recordItr, sparkTaskContextSupplier);
- appendHandle.doAppend();
- appendHandle.close();
- return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
- }
+ public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ return new InsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
- public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
- throws Exception {
- // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
- if (index.canIndexLogFiles()) {
- return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
- } else {
- return super.handleInsert(instantTime, idPfx, recordItr);
- }
+ public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ return new BulkInsertDeltaCommitActionExecutor<>(jsc, config,
+ this, instantTime, records, bulkInsertPartitioner).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
+ return new DeleteDeltaCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords) {
+ return new UpsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords) {
+ return new InsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config,
+ this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
@@ -320,105 +321,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
super.finalizeWrite(jsc, instantTs, stats);
}
- /**
- * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
- * without the need for an index in the logFile.
- */
- class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner {
-
- MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
- super(profile, jsc);
- }
-
- @Override
- protected List<SmallFile> getSmallFiles(String partitionPath) {
-
- // smallFiles only for partitionPath
- List<SmallFile> smallFileLocations = new ArrayList<>();
-
- // Init here since this class (and member variables) might not have been initialized
- HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
-
- // Find out all eligible small file slices
- if (!commitTimeline.empty()) {
- HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- // find smallest file in partition and append to it
- List<FileSlice> allSmallFileSlices = new ArrayList<>();
- // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
- // it. Doing this overtime for a partition, we ensure that we handle small file issues
- if (!index.canIndexLogFiles()) {
- // TODO : choose last N small files since there can be multiple small files written to a single partition
- // by different spark partitions in a single batch
- Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getSliceView()
- .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
- .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
- .min((FileSlice left, FileSlice right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
- if (smallFileSlice.isPresent()) {
- allSmallFileSlices.add(smallFileSlice.get());
- }
- } else {
- // If we can index log files, we can add more inserts to log files for fileIds including those under
- // pending compaction.
- List<FileSlice> allFileSlices =
- getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
- .collect(Collectors.toList());
- for (FileSlice fileSlice : allFileSlices) {
- if (isSmallFile(fileSlice)) {
- allSmallFileSlices.add(fileSlice);
- }
- }
- }
- // Create SmallFiles from the eligible file slices
- for (FileSlice smallFileSlice : allSmallFileSlices) {
- SmallFile sf = new SmallFile();
- if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time handling inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- } else {
- HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- }
- }
- }
- return smallFileLocations;
- }
-
- public List<String> getSmallFileIds() {
- return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
- .collect(Collectors.toList());
- }
-
- private long getTotalFileSize(FileSlice fileSlice) {
- if (!fileSlice.getBaseFile().isPresent()) {
- return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
- } else {
- return fileSlice.getBaseFile().get().getFileSize()
- + convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
- }
- }
-
- private boolean isSmallFile(FileSlice fileSlice) {
- long totalSize = getTotalFileSize(fileSlice);
- return totalSize < config.getParquetMaxFileSize();
- }
-
- // TODO (NA) : Make this static part of utility
- public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
- long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
- .filter(size -> size > 0).reduce(Long::sum).orElse(0L);
- // Here we assume that if there is no base parquet file, all log files contain only inserts.
- // We can then just get the parquet equivalent size of these log files, compare that with
- // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
- return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
- }
- }
-
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 50ec45f..2bf66d3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -54,16 +55,15 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -127,19 +127,83 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
/**
- * Provides a partitioner to perform the upsert operation, based on the workload profile.
+ * Upsert a batch of new records into Hoodie table at the supplied instantTime.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param records JavaRDD of hoodieRecords to upsert
+ * @return HoodieWriteMetadata
*/
- public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
+ public abstract HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> records);
/**
- * Provides a partitioner to perform the insert operation, based on the workload profile.
+ * Insert a batch of new records into Hoodie table at the supplied instantTime.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param records JavaRDD of hoodieRecords to upsert
+ * @return HoodieWriteMetadata
*/
- public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
+ public abstract HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> records);
/**
- * Return whether this HoodieTable implementation can benefit from workload profiling.
+ * Bulk Insert a batch of new records into Hoodie table at the supplied instantTime.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param records JavaRDD of hoodieRecords to upsert
+ * @param bulkInsertPartitioner User Defined Partitioner
+ * @return HoodieWriteMetadata
*/
- public abstract boolean isWorkloadProfileNeeded();
+ public abstract HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> records, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
+
+ /**
+ * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
+ * de-duped and non existent keys will be removed before deleting.
+ *
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param keys {@link List} of {@link HoodieKey}s to be deleted
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys);
+
+ /**
+ * Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
+ * <p>
+ * This implementation requires that the input records are already tagged, and de-duped if needed.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords JavaRDD of hoodieRecords to upsert
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords);
+
+ /**
+ * Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
+ * <p>
+ * This implementation requires that the input records are already tagged, and de-duped if needed.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords JavaRDD of hoodieRecords to upsert
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords);
+
+ /**
+ * Bulk Insert the given prepared records into the Hoodie table, at the supplied instantTime.
+ * <p>
+ * This implementation requires that the input records are already tagged, and de-duped if needed.
+ * @param jsc Java Spark Context jsc
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords JavaRDD of hoodieRecords to upsert
+ * @param bulkInsertPartitioner User Defined Partitioner
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+ JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
public HoodieWriteConfig getConfig() {
return config;
@@ -260,18 +324,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
/**
- * Perform the ultimate IO for a given upserted (RDD) partition.
- */
- public abstract Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition,
- Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
-
- /**
- * Perform the ultimate IO for a given inserted (RDD) partition.
- */
- public abstract Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition,
- Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
-
- /**
* Schedule compaction for the instant time.
*
* @param jsc Spark Context
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index e6ac2e9..5f81db7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -18,13 +18,14 @@
package org.apache.hudi.table.action;
+import java.io.Serializable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
-public abstract class BaseActionExecutor<R> {
+public abstract class BaseActionExecutor<R> implements Serializable {
- protected final JavaSparkContext jsc;
+ protected final transient JavaSparkContext jsc;
protected final HoodieWriteConfig config;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
new file mode 100644
index 0000000..e72c801
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends BaseActionExecutor<HoodieWriteMetadata> {
+
+ private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
+
+ private final WriteOperationType operationType;
+ protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
+
+ public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
+ HoodieTable table, String instantTime, WriteOperationType operationType) {
+ this(jsc, config, table, instantTime, operationType, null);
+ }
+
+ public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
+ HoodieTable table, String instantTime, WriteOperationType operationType,
+ JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime);
+ this.operationType = operationType;
+ }
+
+ public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ HoodieWriteMetadata result = new HoodieWriteMetadata();
+ // Cache the tagged records, so we don't end up computing both
+ // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
+ if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
+ inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+ } else {
+ LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
+ }
+
+ WorkloadProfile profile = null;
+ if (isWorkloadProfileNeeded()) {
+ profile = new WorkloadProfile(inputRecordsRDD);
+ LOG.info("Workload profile :" + profile);
+ saveWorkloadProfileMetadataToInflight(profile, instantTime);
+ }
+
+ // partition using the insert partitioner
+ final Partitioner partitioner = getPartitioner(profile);
+ JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
+ JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
+ if (WriteOperationType.isChangingRecords(operationType)) {
+ return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+ } else {
+ return handleInsertPartition(instantTime, partition, recordItr, partitioner);
+ }
+ }, true).flatMap(List::iterator);
+
+ updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+ return result;
+ }
+
+ /**
+ * Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
+ * rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
+ * are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
+ * Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
+ */
+ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
+ throws HoodieCommitException {
+ try {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ profile.getPartitionPaths().forEach(path -> {
+ WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
+ partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setFileId(key);
+ // TODO : Write baseCommitTime is possible here ?
+ writeStat.setPrevCommit(value.getKey());
+ writeStat.setNumUpdateWrites(value.getValue());
+ metadata.addWriteStat(path.toString(), writeStat);
+ });
+ });
+ metadata.setOperationType(operationType);
+
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ String commitActionType = table.getMetaClient().getCommitActionType();
+ HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
+ activeTimeline.transitionRequestedToInflight(requested,
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException io) {
+ throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
+ }
+ }
+
+ private Partitioner getPartitioner(WorkloadProfile profile) {
+ if (WriteOperationType.isChangingRecords(operationType)) {
+ return getUpsertPartitioner(profile);
+ } else {
+ return getInsertPartitioner(profile);
+ }
+ }
+
+ private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
+ return dedupedRecords.mapToPair(
+ record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
+ .partitionBy(partitioner).map(Tuple2::_2);
+ }
+
+ protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+ // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
+ // RDD actions that are performed after updating the index.
+ writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+ Instant indexStartTime = Instant.now();
+ // Update the index back
+ JavaRDD<WriteStatus> statuses = ((HoodieTable<T>)table).getIndex().updateLocation(writeStatusRDD, jsc,
+ (HoodieTable<T>)table);
+ result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+ result.setWriteStatuses(statuses);
+
+ // Trigger the insert and collect statuses
+ commitOnAutoCommit(result);
+ }
+
+ protected void commitOnAutoCommit(HoodieWriteMetadata result) {
+ if (config.shouldAutoCommit()) {
+ LOG.info("Auto commit enabled: Committing " + instantTime);
+ commit(Option.empty(), result);
+ } else {
+ LOG.info("Auto commit disabled for " + instantTime);
+ }
+ }
+
+ private void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
+ String actionType = table.getMetaClient().getCommitActionType();
+ LOG.info("Committing " + instantTime + ", action Type " + actionType);
+ // Create a Hoodie table which encapsulated the commits and files visible
+ HoodieTable<T> table = HoodieTable.create(config, jsc);
+
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+ result.setCommitted(true);
+ List<HoodieWriteStat> stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
+ result.setWriteStats(stats);
+
+ updateMetadataAndRollingStats(metadata, stats);
+
+ // Finalize write
+ finalizeWrite(instantTime, stats, result);
+
+ // add in extra metadata
+ if (extraMetadata.isPresent()) {
+ extraMetadata.get().forEach(metadata::addMetadata);
+ }
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+ metadata.setOperationType(operationType);
+
+ try {
+ activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+ LOG.info("Committed " + instantTime);
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
+ e);
+ }
+ result.setCommitMetadata(Option.of(metadata));
+ }
+
+ /**
+ * Finalize Write operation.
+ * @param instantTime Instant Time
+ * @param stats Hoodie Write Stat
+ */
+ protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, HoodieWriteMetadata result) {
+ try {
+ Instant start = Instant.now();
+ table.finalizeWrite(jsc, instantTime, stats);
+ result.setFinalizeDuration(Duration.between(start, Instant.now()));
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+ }
+ }
+
+ private void updateMetadataAndRollingStats(HoodieCommitMetadata metadata, List<HoodieWriteStat> writeStats) {
+ // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
+ // 2. Now, first read the existing rolling stats and merge with the result of current metadata.
+
+ // Need to do this on every commit (delta or commit) to support COW and MOR.
+ for (HoodieWriteStat stat : writeStats) {
+ String partitionPath = stat.getPartitionPath();
+ // TODO: why is stat.getPartitionPath() null at times here.
+ metadata.addWriteStat(partitionPath, stat);
+ }
+ }
+
+ protected boolean isWorkloadProfileNeeded() {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
+ Partitioner partitioner) {
+ UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
+ BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
+ BucketType btype = binfo.bucketType;
+ try {
+ if (btype.equals(BucketType.INSERT)) {
+ return handleInsert(binfo.fileIdPrefix, recordItr);
+ } else if (btype.equals(BucketType.UPDATE)) {
+ return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
+ } else {
+ throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+ }
+ } catch (Throwable t) {
+ String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+ LOG.error(msg, t);
+ throw new HoodieUpsertException(msg, t);
+ }
+ }
+
+ protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
+ Partitioner partitioner) {
+ return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+ }
+
+ /**
+ * Provides a partitioner to perform the upsert operation, based on the workload profile.
+ */
+ protected abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
+
+ /**
+ * Provides a partitioner to perform the insert operation, based on the workload profile.
+ */
+ protected abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
+
+ protected abstract Iterator<List<WriteStatus>> handleInsert(String idPfx,
+ Iterator<HoodieRecord<T>> recordItr) throws Exception;
+
+ protected abstract Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+ Iterator<HoodieRecord<T>> recordItr) throws IOException;
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
index e6ac2e9..1d98ad4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
@@ -16,28 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
-
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+/**
+ * Helper class for a bucket's type (INSERT and UPDATE) and its file location.
+ */
+public class BucketInfo implements Serializable {
+
+ BucketType bucketType;
+ String fileIdPrefix;
+ String partitionPath;
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("BucketInfo {");
+ sb.append("bucketType=").append(bucketType).append(", ");
+ sb.append("fileIdPrefix=").append(fileIdPrefix).append(", ");
+ sb.append("partitionPath=").append(partitionPath);
+ sb.append('}');
+ return sb.toString();
}
-
- public abstract R execute();
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
index e6ac2e9..70ee473 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
@@ -16,28 +16,8 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
-
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
- }
-
- public abstract R execute();
+public enum BucketType {
+ UPDATE, INSERT
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
new file mode 100644
index 0000000..e0182da
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+ public BulkInsertCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
+ this.bulkInsertPartitioner = bulkInsertPartitioner;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ try {
+ return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
+ this, true, bulkInsertPartitioner);
+ } catch (Throwable e) {
+ if (e instanceof HoodieInsertException) {
+ throw e;
+ }
+ throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
new file mode 100644
index 0000000..fbc8dbb
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.BulkInsertMapFunction;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
+
+ public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(
+ JavaRDD<HoodieRecord<T>> inputRecords, String instantTime,
+ HoodieTable<T> table, HoodieWriteConfig config,
+ CommitActionExecutor<T> executor, boolean performDedupe,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ HoodieWriteMetadata result = new HoodieWriteMetadata();
+
+ // De-dupe/merge if needed
+ JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
+
+ if (performDedupe) {
+ dedupedRecords = WriteHelper.combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
+ config.getInsertShuffleParallelism(), ((HoodieTable<T>)table));
+ }
+
+ final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+ final int parallelism = config.getBulkInsertShuffleParallelism();
+ if (bulkInsertPartitioner.isPresent()) {
+ repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+ } else {
+ // Now, sort the records and line them up nicely for loading.
+ repartitionedRecords = dedupedRecords.sortBy(record -> {
+ // Let's use "partitionPath + key" as the sort key. Spark, will ensure
+ // the records split evenly across RDD partitions, such that small partitions fit
+ // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
+ return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
+ }, true, parallelism);
+ }
+
+ // generate new file ID prefixes for each output partition
+ final List<String> fileIDPrefixes =
+ IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+
+ table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
+ table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
+
+ JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
+ .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
+ .flatMap(List::iterator);
+
+ executor.updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+ return result;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..0b8e75f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+
+ private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
+ private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+ public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ this.preppedInputRecordRdd = preppedInputRecordRdd;
+ this.bulkInsertPartitioner = bulkInsertPartitioner;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ try {
+ return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
+ this, false, bulkInsertPartitioner);
+ } catch (Throwable e) {
+ if (e instanceof HoodieInsertException) {
+ throw e;
+ }
+ throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
new file mode 100644
index 0000000..b958837
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ParquetReaderIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends BaseCommitActionExecutor<T> {
+
+ private static final Logger LOG = LogManager.getLogger(CommitActionExecutor.class);
+
+ public CommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, WriteOperationType operationType) {
+ super(jsc, config, table, instantTime, operationType);
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+ Iterator<HoodieRecord<T>> recordItr)
+ throws IOException {
+ // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
+ if (!recordItr.hasNext()) {
+ LOG.info("Empty partition with fileId => " + fileId);
+ return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+ }
+ // these are updates
+ HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
+ return handleUpdateInternal(upsertHandle, fileId);
+ }
+
+ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+ Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
+ // these are updates
+ HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, keyToNewRecords, oldDataFile);
+ return handleUpdateInternal(upsertHandle, fileId);
+ }
+
+ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
+ throws IOException {
+ if (upsertHandle.getOldFilePath() == null) {
+ throw new HoodieUpsertException(
+ "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
+ } else {
+ AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
+ BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ try (ParquetReader<IndexedRecord> reader =
+ AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
+ wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+ new UpdateHandler(upsertHandle), x -> x);
+ wrapper.execute();
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ upsertHandle.close();
+ if (null != wrapper) {
+ wrapper.shutdownNow();
+ }
+ }
+ }
+
+ // TODO(vc): This needs to be revisited
+ if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
+ LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ + upsertHandle.getWriteStatus());
+ }
+ return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
+ }
+
+ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
+ return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+ }
+
+ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
+ Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+ return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, keyToNewRecords,
+ partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
+ throws Exception {
+ // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
+ if (!recordItr.hasNext()) {
+ LOG.info("Empty partition");
+ return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+ }
+ return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+ sparkTaskContextSupplier);
+ }
+
+ @Override
+ public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ if (profile == null) {
+ throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
+ }
+ return new UpsertPartitioner(profile, jsc, table, config);
+ }
+
+ @Override
+ public Partitioner getInsertPartitioner(WorkloadProfile profile) {
+ return getUpsertPartitioner(profile);
+ }
+
+ /**
+ * Consumer that dequeues records from queue and sends to Merge Handle.
+ */
+ private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
+
+ private final HoodieMergeHandle upsertHandle;
+
+ private UpdateHandler(HoodieMergeHandle upsertHandle) {
+ this.upsertHandle = upsertHandle;
+ }
+
+ @Override
+ protected void consumeOneRecord(GenericRecord record) {
+ upsertHandle.write(record);
+ }
+
+ @Override
+ protected void finish() {}
+
+ @Override
+ protected Void getResult() {
+ return null;
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
index e6ac2e9..ba25a97 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
@@ -16,28 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class DeleteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
- protected final String instantTime;
+ private final JavaRDD<HoodieKey> keys;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public DeleteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieKey> keys) {
+ super(jsc, config, table, instantTime, WriteOperationType.DELETE);
+ this.keys = keys;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
new file mode 100644
index 0000000..7ee891f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Helper class to perform delete keys on hoodie table.
+ * @param <T>
+ */
+public class DeleteHelper<T extends HoodieRecordPayload<T>> {
+
+ /**
+ * Deduplicate Hoodie records, using the given deduplication function.
+ *
+ * @param keys RDD of HoodieKey to deduplicate
+ * @return RDD of HoodieKey already be deduplicated
+ */
+ private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys,
+ HoodieTable<T> table) {
+ boolean isIndexingGlobal = table.getIndex().isGlobal();
+ if (isIndexingGlobal) {
+ return keys.keyBy(HoodieKey::getRecordKey)
+ .reduceByKey((key1, key2) -> key1)
+ .values();
+ } else {
+ return keys.distinct();
+ }
+ }
+
+ public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
+ JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
+ CommitActionExecutor<T> deleteExecutor) {
+ try {
+ HoodieWriteMetadata result = null;
+ // De-dupe/merge if needed
+ JavaRDD<HoodieKey> dedupedKeys = config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, table) : keys;
+
+ JavaRDD<HoodieRecord<T>> dedupedRecords =
+ dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+ Instant beginTag = Instant.now();
+ // perform index loop up to get existing location of records
+ JavaRDD<HoodieRecord<T>> taggedRecords =
+ ((HoodieTable<T>)table).getIndex().tagLocation(dedupedRecords, jsc, (HoodieTable<T>)table);
+ Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
+
+ // filter out non existant keys/records
+ JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+ if (!taggedValidRecords.isEmpty()) {
+ result = deleteExecutor.execute(taggedValidRecords);
+ result.setIndexLookupDuration(tagLocationDuration);
+ } else {
+ // if entire set of keys are non existent
+ deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), instantTime);
+ result = new HoodieWriteMetadata();
+ result.setWriteStatuses(jsc.emptyRDD());
+ deleteExecutor.commitOnAutoCommit(result);
+ }
+ return result;
+ } catch (Throwable e) {
+ if (e instanceof HoodieUpsertException) {
+ throw (HoodieUpsertException) e;
+ }
+ throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
new file mode 100644
index 0000000..64fd4df
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import java.util.List;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.time.Duration;
+
+/**
+ * Contains metadata, write-statuses and latency times corresponding to a commit/delta-commit action.
+ */
+public class HoodieWriteMetadata {
+
+ private JavaRDD<WriteStatus> writeStatuses;
+ private Option<Duration> indexLookupDuration = Option.empty();
+
+ // Will be set when auto-commit happens
+ private boolean isCommitted;
+ private Option<HoodieCommitMetadata> commitMetadata = Option.empty();
+ private Option<List<HoodieWriteStat>> writeStats = Option.empty();
+ private Option<Duration> indexUpdateDuration = Option.empty();
+ private Option<Duration> finalizeDuration = Option.empty();
+
+ public HoodieWriteMetadata() {
+ }
+
+ public JavaRDD<WriteStatus> getWriteStatuses() {
+ return writeStatuses;
+ }
+
+ public Option<HoodieCommitMetadata> getCommitMetadata() {
+ return commitMetadata;
+ }
+
+ public void setWriteStatuses(JavaRDD<WriteStatus> writeStatuses) {
+ this.writeStatuses = writeStatuses;
+ }
+
+ public void setCommitMetadata(Option<HoodieCommitMetadata> commitMetadata) {
+ this.commitMetadata = commitMetadata;
+ }
+
+ public Option<Duration> getFinalizeDuration() {
+ return finalizeDuration;
+ }
+
+ public void setFinalizeDuration(Duration finalizeDuration) {
+ this.finalizeDuration = Option.ofNullable(finalizeDuration);
+ }
+
+ public Option<Duration> getIndexUpdateDuration() {
+ return indexUpdateDuration;
+ }
+
+ public void setIndexUpdateDuration(Duration indexUpdateDuration) {
+ this.indexUpdateDuration = Option.ofNullable(indexUpdateDuration);
+ }
+
+ public boolean isCommitted() {
+ return isCommitted;
+ }
+
+ public void setCommitted(boolean committed) {
+ isCommitted = committed;
+ }
+
+ public Option<List<HoodieWriteStat>> getWriteStats() {
+ return writeStats;
+ }
+
+ public void setWriteStats(List<HoodieWriteStat> writeStats) {
+ this.writeStats = Option.of(writeStats);
+ }
+
+ public Option<Duration> getIndexLookupDuration() {
+ return indexLookupDuration;
+ }
+
+ public void setIndexLookupDuration(Duration indexLookupDuration) {
+ this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
index e6ac2e9..d5dcec8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
@@ -16,28 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
-
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+/**
+ * Helper class for an insert bucket along with the weight [0.0, 1.0] that defines the amount of incoming inserts that
+ * should be allocated to the bucket.
+ */
+public class InsertBucket implements Serializable {
+
+ int bucketNumber;
+ // fraction of total inserts, that should go into this bucket
+ double weight;
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("WorkloadStat {");
+ sb.append("bucketNumber=").append(bucketNumber).append(", ");
+ sb.append("weight=").append(weight);
+ sb.append('}');
+ return sb.toString();
}
-
- public abstract R execute();
-}
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
index e6ac2e9..d08dab2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
@@ -16,28 +16,32 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class InsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
- protected final String instantTime;
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public InsertCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime, WriteOperationType.INSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
}
- public abstract R execute();
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+ config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
index e6ac2e9..acc9902 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
@@ -16,28 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class InsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
- protected final String instantTime;
+ private final JavaRDD<HoodieRecord<T>> preppedRecords;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public InsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return super.execute(preppedRecords);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
index e6ac2e9..ccea6af 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
@@ -16,28 +16,25 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
+import org.apache.hudi.common.model.HoodieRecordLocation;
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
-
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+/**
+ * Helper class for a small file's location and its actual size on disk.
+ */
+public class SmallFile implements Serializable {
+
+ public HoodieRecordLocation location;
+ public long sizeBytes;
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("SmallFile {");
+ sb.append("location=").append(location).append(", ");
+ sb.append("sizeBytes=").append(sizeBytes);
+ sb.append('}');
+ return sb.toString();
}
-
- public abstract R execute();
-}
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
index e6ac2e9..efdcae1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
@@ -16,28 +16,32 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class UpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
- protected final String instantTime;
+ private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public UpsertCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
}
- public abstract R execute();
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>)table,
+ config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
new file mode 100644
index 0000000..745388c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+/**
+ * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
+ */
+public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partitioner {
+
+ private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
+
+ /**
+ * List of all small files to be corrected.
+ */
+ protected List<SmallFile> smallFiles = new ArrayList<>();
+ /**
+ * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
+ */
+ private int totalBuckets = 0;
+ /**
+ * Stat for the current workload. Helps in determining total inserts, upserts etc.
+ */
+ private WorkloadStat globalStat;
+ /**
+ * Helps decide which bucket an incoming update should go to.
+ */
+ private HashMap<String, Integer> updateLocationToBucket;
+ /**
+ * Helps us pack inserts into 1 or more buckets depending on number of incoming records.
+ */
+ private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
+ /**
+ * Remembers what type each bucket is for later.
+ */
+ private HashMap<Integer, BucketInfo> bucketInfoMap;
+
+ protected final HoodieTable<T> table;
+
+ protected final HoodieWriteConfig config;
+
+ public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
+ HoodieWriteConfig config) {
+ updateLocationToBucket = new HashMap<>();
+ partitionPathToInsertBuckets = new HashMap<>();
+ bucketInfoMap = new HashMap<>();
+ globalStat = profile.getGlobalStat();
+ this.table = table;
+ this.config = config;
+ assignUpdates(profile);
+ assignInserts(profile, jsc);
+
+ LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
+ }
+
+ private void assignUpdates(WorkloadProfile profile) {
+ // each update location gets a partition
+ Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
+ for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
+ for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
+ partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
+ addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+ }
+ }
+ }
+
+ private int addUpdateBucket(String partitionPath, String fileIdHint) {
+ int bucket = totalBuckets;
+ updateLocationToBucket.put(fileIdHint, bucket);
+ BucketInfo bucketInfo = new BucketInfo();
+ bucketInfo.bucketType = BucketType.UPDATE;
+ bucketInfo.fileIdPrefix = fileIdHint;
+ bucketInfo.partitionPath = partitionPath;
+ bucketInfoMap.put(totalBuckets, bucketInfo);
+ totalBuckets++;
+ return bucket;
+ }
+
+ private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
+ // for new inserts, compute buckets depending on how many records we have for each partition
+ Set<String> partitionPaths = profile.getPartitionPaths();
+ long averageRecordSize =
+ averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+ config.getCopyOnWriteRecordSizeEstimate());
+ LOG.info("AvgRecordSize => " + averageRecordSize);
+
+ Map<String, List<SmallFile>> partitionSmallFilesMap =
+ getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
+
+ for (String partitionPath : partitionPaths) {
+ WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+ if (pStat.getNumInserts() > 0) {
+
+ List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+ this.smallFiles.addAll(smallFiles);
+
+ LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
+
+ long totalUnassignedInserts = pStat.getNumInserts();
+ List<Integer> bucketNumbers = new ArrayList<>();
+ List<Long> recordsPerBucket = new ArrayList<>();
+
+ // first try packing this into one of the smallFiles
+ for (SmallFile smallFile : smallFiles) {
+ long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
+ totalUnassignedInserts);
+ if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
+ // create a new bucket or re-use an existing bucket
+ int bucket;
+ if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
+ bucket = updateLocationToBucket.get(smallFile.location.getFileId());
+ LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
+ } else {
+ bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
+ LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
+ }
+ bucketNumbers.add(bucket);
+ recordsPerBucket.add(recordsToAppend);
+ totalUnassignedInserts -= recordsToAppend;
+ }
+ }
+
+ // if we have anything more, create new insert buckets, like normal
+ if (totalUnassignedInserts > 0) {
+ long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
+ if (config.shouldAutoTuneInsertSplits()) {
+ insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
+ }
+
+ int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
+ LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
+ for (int b = 0; b < insertBuckets; b++) {
+ bucketNumbers.add(totalBuckets);
+ recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
+ BucketInfo bucketInfo = new BucketInfo();
+ bucketInfo.bucketType = BucketType.INSERT;
+ bucketInfo.partitionPath = partitionPath;
+ bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+ bucketInfoMap.put(totalBuckets, bucketInfo);
+ totalBuckets++;
+ }
+ }
+
+ // Go over all such buckets, and assign weights as per amount of incoming inserts.
+ List<InsertBucket> insertBuckets = new ArrayList<>();
+ for (int i = 0; i < bucketNumbers.size(); i++) {
+ InsertBucket bkt = new InsertBucket();
+ bkt.bucketNumber = bucketNumbers.get(i);
+ bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
+ insertBuckets.add(bkt);
+ }
+ LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
+ partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
+ }
+ }
+ }
+
+ private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
+
+ Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+ if (partitionPaths != null && partitionPaths.size() > 0) {
+ JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
+ partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
+ partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
+ }
+
+ return partitionSmallFilesMap;
+ }
+
+ /**
+ * Returns a list of small files in the given partition path.
+ */
+ protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+ // smallFiles only for partitionPath
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+ if (!commitTimeline.empty()) { // if we have some commits
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+ List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+ .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+ for (HoodieBaseFile file : allFiles) {
+ if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+ String filename = file.getFileName();
+ SmallFile sf = new SmallFile();
+ sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+ sf.sizeBytes = file.getFileSize();
+ smallFileLocations.add(sf);
+ }
+ }
+ }
+
+ return smallFileLocations;
+ }
+
+ public BucketInfo getBucketInfo(int bucketNumber) {
+ return bucketInfoMap.get(bucketNumber);
+ }
+
+ public List<InsertBucket> getInsertBuckets(String partitionPath) {
+ return partitionPathToInsertBuckets.get(partitionPath);
+ }
+
+ @Override
+ public int numPartitions() {
+ return totalBuckets;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
+ (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
+ if (keyLocation._2().isPresent()) {
+ HoodieRecordLocation location = keyLocation._2().get();
+ return updateLocationToBucket.get(location.getFileId());
+ } else {
+ List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
+ // pick the target bucket to use based on the weights.
+ double totalWeight = 0.0;
+ final long totalInserts = Math.max(1, globalStat.getNumInserts());
+ final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
+ final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
+ for (InsertBucket insertBucket : targetBuckets) {
+ totalWeight += insertBucket.weight;
+ if (r <= totalWeight) {
+ return insertBucket.bucketNumber;
+ }
+ }
+ // return first one, by default
+ return targetBuckets.get(0).bucketNumber;
+ }
+ }
+
+ /**
+ * Obtains the average record size based on records written during previous commits. Used for estimating how many
+ * records pack into one file.
+ */
+ protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
+ long avgSize = defaultRecordSizeEstimate;
+ try {
+ if (!commitTimeline.empty()) {
+ // Go over the reverse ordered commits to get a more recent estimate of average record size.
+ Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
+ while (instants.hasNext()) {
+ HoodieInstant instant = instants.next();
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+ long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+ long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+ if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
+ avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
+ break;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // make this fail safe.
+ LOG.error("Error trying to compute average bytes/record ", t);
+ }
+ return avgSize;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
index e6ac2e9..5999104 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
@@ -16,28 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class UpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
- protected final String instantTime;
+ private final JavaRDD<HoodieRecord<T>> preppedRecords;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public UpsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return super.execute(preppedRecords);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
new file mode 100644
index 0000000..7faee1c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.Duration;
+import java.time.Instant;
+import scala.Tuple2;
+
+public class WriteHelper<T extends HoodieRecordPayload<T>> {
+
+ public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
+ JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
+ HoodieTable<T> table, boolean shouldCombine,
+ int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
+ try {
+ // De-dupe/merge if needed
+ JavaRDD<HoodieRecord<T>> dedupedRecords =
+ combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table);
+
+ Instant lookupBegin = Instant.now();
+ JavaRDD<HoodieRecord<T>> taggedRecords = dedupedRecords;
+ if (performTagging) {
+ // perform index loop up to get existing location of records
+ taggedRecords = tag(dedupedRecords, jsc, table);
+ }
+ Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
+
+ HoodieWriteMetadata result = executor.execute(taggedRecords);
+ result.setIndexLookupDuration(indexLookupDuration);
+ return result;
+ } catch (Throwable e) {
+ if (e instanceof HoodieUpsertException) {
+ throw (HoodieUpsertException) e;
+ }
+ throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
+ }
+ }
+
+ private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> tag(
+ JavaRDD<HoodieRecord<T>> dedupedRecords, JavaSparkContext jsc, HoodieTable<T> table) {
+ // perform index loop up to get existing location of records
+ return table.getIndex().tagLocation(dedupedRecords, jsc, table);
+ }
+
+ public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> combineOnCondition(
+ boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism, HoodieTable<T> table) {
+ return condition ? deduplicateRecords(records, table, parallelism) : records;
+ }
+
+ /**
+ * Deduplicate Hoodie records, using the given deduplication function.
+ *
+ * @param records hoodieRecords to deduplicate
+ * @param parallelism parallelism or partitions to be used while reducing/deduplicating
+ * @return RDD of HoodieRecord already be deduplicated
+ */
+ public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
+ JavaRDD<HoodieRecord<T>> records, HoodieTable<T> table, int parallelism) {
+ return deduplicateRecords(records, table.getIndex(), parallelism);
+ }
+
+ public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
+ JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism) {
+ boolean isIndexingGlobal = index.isGlobal();
+ return records.mapToPair(record -> {
+ HoodieKey hoodieKey = record.getKey();
+ // If index used is global, then records are expected to differ in their partitionPath
+ Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+ return new Tuple2<>(key, record);
+ }).reduceByKey((rec1, rec2) -> {
+ @SuppressWarnings("unchecked")
+ T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+ // we cannot allow the user to change the key or partitionPath, since that will affect
+ // everything
+ // so pick it from one of the records.
+ return new HoodieRecord<T>(rec1.getKey(), reducedData);
+ }, parallelism).map(Tuple2::_2);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..95779a7
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.hudi.table.action.commit.BulkInsertHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+ public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
+ this.bulkInsertPartitioner = bulkInsertPartitioner;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ try {
+ return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
+ this, true, bulkInsertPartitioner);
+ } catch (Throwable e) {
+ if (e instanceof HoodieInsertException) {
+ throw e;
+ }
+ throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..7b6e146
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.hudi.table.action.commit.BulkInsertHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
+
+ private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
+ private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+ public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
+ Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+ super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ this.preppedInputRecordRdd = preppedInputRecordRdd;
+ this.bulkInsertPartitioner = bulkInsertPartitioner;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ try {
+ return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
+ this, false, bulkInsertPartitioner);
+ } catch (Throwable e) {
+ if (e instanceof HoodieInsertException) {
+ throw e;
+ }
+ throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
index e6ac2e9..1575406 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
@@ -16,28 +16,32 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
- protected final JavaSparkContext jsc;
- protected final HoodieWriteConfig config;
+import org.apache.hudi.table.action.commit.DeleteHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
- protected final HoodieTable<?> table;
+public class DeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
- protected final String instantTime;
+ private final JavaRDD<HoodieKey> keys;
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public DeleteDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieKey> keys) {
+ super(jsc, config, table, instantTime, WriteOperationType.DELETE);
+ this.keys = keys;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
new file mode 100644
index 0000000..775580e
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
+import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.hudi.table.action.commit.CommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+ private static final Logger LOG = LogManager.getLogger(DeltaCommitActionExecutor.class);
+
+ // UpsertPartitioner for MergeOnRead table type
+ private UpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
+
+ public DeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, WriteOperationType operationType) {
+ super(jsc, config, table, instantTime, operationType);
+ }
+
+ @Override
+ public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ if (profile == null) {
+ throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
+ }
+ mergeOnReadUpsertPartitioner = new UpsertDeltaCommitPartitioner(profile, jsc, table, config);
+ return mergeOnReadUpsertPartitioner;
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+ Iterator<HoodieRecord<T>> recordItr) throws IOException {
+ LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
+
+ if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+ LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
+ return super.handleUpdate(partitionPath, fileId, recordItr);
+ } else {
+ HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, (HoodieTable<T>)table,
+ partitionPath, fileId, recordItr, sparkTaskContextSupplier);
+ appendHandle.doAppend();
+ appendHandle.close();
+ return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
+ }
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
+ throws Exception {
+ // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
+ if (table.getIndex().canIndexLogFiles()) {
+ return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+ sparkTaskContextSupplier);
+ } else {
+ return super.handleInsert(idPfx, recordItr);
+ }
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..f76f7fc
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.WriteHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class InsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+ public InsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime, WriteOperationType.INSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+ config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, false);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
similarity index 51%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
index e6ac2e9..55031ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
@@ -16,28 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-public abstract class BaseActionExecutor<R> {
+public class InsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
- protected final JavaSparkContext jsc;
+ private final JavaRDD<HoodieRecord<T>> preppedRecords;
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public InsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return super.execute(preppedRecords);
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..1fdf433
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.WriteHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class UpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
+
+ private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+ public UpsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
+ this.inputRecordsRDD = inputRecordsRDD;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+ config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java
new file mode 100644
index 0000000..f5a4370
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.table.action.commit.UpsertPartitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
+ * without the need for an index in the logFile.
+ */
+public class UpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>> extends UpsertPartitioner<T> {
+
+ UpsertDeltaCommitPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
+ HoodieWriteConfig config) {
+ super(profile, jsc, table, config);
+ }
+
+ @Override
+ protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+ // smallFiles only for partitionPath
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ // Init here since this class (and member variables) might not have been initialized
+ HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+
+ // Find out all eligible small file slices
+ if (!commitTimeline.empty()) {
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+ // find smallest file in partition and append to it
+ List<FileSlice> allSmallFileSlices = new ArrayList<>();
+ // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
+ // it. Doing this overtime for a partition, we ensure that we handle small file issues
+ if (!table.getIndex().canIndexLogFiles()) {
+ // TODO : choose last N small files since there can be multiple small files written to a single partition
+ // by different spark partitions in a single batch
+ Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
+ .filter(
+ fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
+ .getParquetSmallFileLimit())
+ .min((FileSlice left, FileSlice right) ->
+ left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
+ if (smallFileSlice.isPresent()) {
+ allSmallFileSlices.add(smallFileSlice.get());
+ }
+ } else {
+ // If we can index log files, we can add more inserts to log files for fileIds including those under
+ // pending compaction.
+ List<FileSlice> allFileSlices =
+ table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
+ .collect(Collectors.toList());
+ for (FileSlice fileSlice : allFileSlices) {
+ if (isSmallFile(fileSlice)) {
+ allSmallFileSlices.add(fileSlice);
+ }
+ }
+ }
+ // Create SmallFiles from the eligible file slices
+ for (FileSlice smallFileSlice : allSmallFileSlices) {
+ SmallFile sf = new SmallFile();
+ if (smallFileSlice.getBaseFile().isPresent()) {
+ // TODO : Move logic of file name, file id, base commit time handling inside file slice
+ String filename = smallFileSlice.getBaseFile().get().getFileName();
+ sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
+ } else {
+ HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+ sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+ FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
+ }
+ }
+ }
+ return smallFileLocations;
+ }
+
+ public List<String> getSmallFileIds() {
+ return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
+ .collect(Collectors.toList());
+ }
+
+ private long getTotalFileSize(FileSlice fileSlice) {
+ if (!fileSlice.getBaseFile().isPresent()) {
+ return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
+ } else {
+ return fileSlice.getBaseFile().get().getFileSize()
+ + convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
+ }
+ }
+
+ private boolean isSmallFile(FileSlice fileSlice) {
+ long totalSize = getTotalFileSize(fileSlice);
+ return totalSize < config.getParquetMaxFileSize();
+ }
+
+ // TODO (NA) : Make this static part of utility
+ public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
+ long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
+ .filter(size -> size > 0).reduce(Long::sum).orElse(0L);
+ // Here we assume that if there is no base parquet file, all log files contain only inserts.
+ // We can then just get the parquet equivalent size of these log files, compare that with
+ // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
+ return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
similarity index 51%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
index e6ac2e9..413d2e2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
@@ -16,28 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-public abstract class BaseActionExecutor<R> {
+public class UpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+ extends DeltaCommitActionExecutor<T> {
- protected final JavaSparkContext jsc;
+ private final JavaRDD<HoodieRecord<T>> preppedRecords;
- protected final HoodieWriteConfig config;
-
- protected final HoodieTable<?> table;
-
- protected final String instantTime;
-
- public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
- this.jsc = jsc;
- this.config = config;
- this.table = table;
- this.instantTime = instantTime;
+ public UpsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
}
- public abstract R execute();
+ public HoodieWriteMetadata execute() {
+ return super.execute(preppedRecords);
+ }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index c5b52fa..ee76ed3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
@@ -222,7 +223,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
*
* @param records List of Hoodie records
*/
- void assertNodupesWithinPartition(List<HoodieRecord> records) {
+ void assertNodupesWithinPartition(List<HoodieRecord<TestRawTripPayload>> records) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
for (HoodieRecord r : records) {
String key = r.getRecordKey();
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 49311f7..7382b7e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -18,8 +18,10 @@
package org.apache.hudi.client;
+import java.util.HashSet;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -50,6 +52,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -195,7 +198,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
String recordKey = UUID.randomUUID().toString();
HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
- HoodieRecord recordOne =
+ HoodieRecord<TestRawTripPayload> recordOne =
new HoodieRecord(keyOne, HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime));
HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
@@ -206,42 +209,51 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
HoodieRecord recordThree =
new HoodieRecord(keyTwo, HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
- JavaRDD<HoodieRecord> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+ JavaRDD<HoodieRecord<TestRawTripPayload>> records =
+ jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
- // dedup should be done based on recordKey only
- HoodieWriteClient clientWithDummyGlobalIndex = getWriteClientWithDummyIndex(true);
- List<HoodieRecord> dedupedRecs = clientWithDummyGlobalIndex.deduplicateRecords(records, 1).collect();
+ // Global dedup should be done based on recordKey only
+ HoodieIndex index = mock(HoodieIndex.class);
+ when(index.isGlobal()).thenReturn(true);
+ List<HoodieRecord<TestRawTripPayload>> dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
assertEquals(1, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
- // dedup should be done based on both recordKey and partitionPath
- HoodieWriteClient clientWithDummyNonGlobalIndex = getWriteClientWithDummyIndex(false);
- dedupedRecs = clientWithDummyNonGlobalIndex.deduplicateRecords(records, 1).collect();
+ // non-Global dedup should be done based on both recordKey and partitionPath
+ index = mock(HoodieIndex.class);
+ when(index.isGlobal()).thenReturn(false);
+ dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
// Perform write-action and check
+ JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
try (HoodieWriteClient client = getHoodieWriteClient(getConfigBuilder().combineInput(true, true).build(), false);) {
client.startCommitWithTime(newCommitTime);
- List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect();
+ List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
- assertNodupesWithinPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+ assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
/**
- * Build a test Hoodie WriteClient with dummy index to configure isGlobal flag.
+ * Assert that there is no duplicate key at the partition level.
*
- * @param isGlobal Flag to control HoodieIndex.isGlobal
- * @return Hoodie Write Client
- * @throws Exception in case of error
+ * @param records List of Hoodie records
*/
- private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) {
- HoodieIndex index = mock(HoodieIndex.class);
- when(index.isGlobal()).thenReturn(isGlobal);
- return getHoodieWriteClient(getConfigBuilder().build(), false, index);
+ void assertNodupesInPartition(List<HoodieRecord> records) {
+ Map<String, Set<String>> partitionToKeys = new HashMap<>();
+ for (HoodieRecord r : records) {
+ String key = r.getRecordKey();
+ String partitionPath = r.getPartitionPath();
+ if (!partitionToKeys.containsKey(partitionPath)) {
+ partitionToKeys.put(partitionPath, new HashSet<>());
+ }
+ assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+ partitionToKeys.get(partitionPath).add(key);
+ }
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 8a726c7..fb66ddb 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -61,6 +61,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Assert;
@@ -1346,9 +1348,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1);
// initialize partitioner
- hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc);
+ DeltaCommitActionExecutor actionExecutor = new DeleteDeltaCommitActionExecutor(jsc, cfg, hoodieTable,
+ newDeleteTime, deleteRDD);
+ actionExecutor.getUpsertPartitioner(new WorkloadProfile(deleteRDD));
final List<List<WriteStatus>> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator());
+ return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
// Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
similarity index 77%
rename from hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 7fd1be5..169ad31 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -16,34 +16,31 @@
* limitations under the License.
*/
-package org.apache.hudi.table;
+package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
-import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.io.HoodieCreateHandle;
-import org.apache.hudi.table.HoodieCopyOnWriteTable.UpsertPartitioner;
+import org.apache.hudi.table.HoodieCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
@@ -68,20 +65,18 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import scala.Tuple2;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestCopyOnWriteTable extends HoodieClientTestHarness {
+public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
- private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteTable.class);
+ private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
@Before
public void setUp() throws Exception {
- initSparkContexts("TestCopyOnWriteTable");
+ initSparkContexts("TestCopyOnWriteActionExecutor");
initPath();
initMetaClient();
initTestDataGenerator();
@@ -179,7 +174,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
GenericRecord newRecord;
int index = 0;
for (GenericRecord record : fileRecords) {
- assertTrue(record.get("_row_key").toString().equals(records.get(index).getRecordKey()));
+ System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
+ assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
index++;
}
@@ -300,8 +296,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
// Insert new records
+ CommitActionExecutor actionExecutor = new InsertCommitActionExecutor(jsc, config, table,
+ firstCommitTime, jsc.parallelize(records));
List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return table.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+ return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
}).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
Map<String, String> allWriteStatusMergedMetadataMap =
@@ -326,8 +324,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
// Insert new records
final List<HoodieRecord> recs2 = records;
+ CommitActionExecutor actionExecutor = new InsertPreppedCommitActionExecutor(jsc, config, table,
+ instantTime, jsc.parallelize(recs2));
List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), recs2.iterator());
+ return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator());
}).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
// TODO: check the actual files and make sure 11 records, total were written.
@@ -347,9 +347,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
// Insert new records
final List<HoodieRecord> recs3 = records;
-
+ CommitActionExecutor newActionExecutor = new UpsertPreppedCommitActionExecutor(jsc, config, table,
+ instantTime, jsc.parallelize(recs3));
returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), recs3.iterator());
+ return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator());
}).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
assertEquals(3, returnedStatuses.size());
@@ -361,7 +362,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath());
assertEquals(1, returnedStatuses.get(2).getTotalRecords());
-
}
@Test
@@ -382,8 +382,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
}
// Insert new records
+ CommitActionExecutor actionExecutor = new UpsertCommitActionExecutor(jsc, config, table,
+ instantTime, jsc.parallelize(records));
jsc.parallelize(Arrays.asList(1))
- .map(i -> table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), records.iterator()))
+ .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
.map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
// Check the updated file
@@ -397,83 +399,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
assertEquals("If the number of records are more than 1150, then there should be a new file", 3, counts);
}
- private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
- String testPartitionPath, boolean autoSplitInserts) throws Exception {
- HoodieWriteConfig config = makeHoodieClientConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
- .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
- .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
-
- HoodieClientTestUtils.fakeCommitFile(basePath, "001");
- HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
-
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
- List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
- List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", numUpdates);
- for (HoodieRecord updateRec : updateRecords) {
- updateRec.unseal();
- updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
- updateRec.seal();
- }
- List<HoodieRecord> records = new ArrayList<>();
- records.addAll(insertRecords);
- records.addAll(updateRecords);
- WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
- HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
- (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile, jsc);
- assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition(
- new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
- return partitioner;
- }
-
- @Test
- public void testUpsertPartitioner() throws Exception {
- final String testPartitionPath = "2016/09/26";
- // Inserts + Updates... Check all updates go together & inserts subsplit
- UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
- List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
- assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
- }
-
- @Test
- public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
- final String testPartitionPath = "2016/09/26";
- // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
- // smallest file
- UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
- List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
-
- assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
- assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
- partitioner.getBucketInfo(0).bucketType);
- assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
- partitioner.getBucketInfo(1).bucketType);
- assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
- partitioner.getBucketInfo(2).bucketType);
- assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
- assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
- assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
-
- // Now with insert split size auto tuned
- partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
- insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
-
- assertEquals("Should have 4 partitions", 4, partitioner.numPartitions());
- assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
- partitioner.getBucketInfo(0).bucketType);
- assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
- partitioner.getBucketInfo(1).bucketType);
- assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
- partitioner.getBucketInfo(2).bucketType);
- assertEquals("Bucket 3 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
- partitioner.getBucketInfo(3).bucketType);
- assertEquals("Total of 4 insert buckets", 4, insertBuckets.size());
- assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
- assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
- }
-
@Test
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
@@ -483,8 +408,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
String instantTime = "000";
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
+ CommitActionExecutor actionExecutor = new InsertCommitActionExecutor(jsc, config, table,
+ instantTime, jsc.parallelize(inserts));
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return table.handleInsert(instantTime, UUID.randomUUID().toString(), inserts.iterator());
+ return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
WriteStatus writeStatus = ws.get(0).get(0);
@@ -494,8 +421,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
String partitionPath = updates.get(0).getPartitionPath();
long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
+ CommitActionExecutor newActionExecutor = new UpsertCommitActionExecutor(jsc, config, table,
+ instantTime, jsc.parallelize(updates));
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
- return table.handleUpdate(instantTime, partitionPath, fileId, updates.iterator());
+ return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
new file mode 100644
index 0000000..818d765
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hudi.common.HoodieClientTestHarness;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Tuple2;
+
+public class TestUpsertPartitioner extends HoodieClientTestHarness {
+
+ private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class);
+
+ @Before
+ public void setUp() throws Exception {
+ initSparkContexts("TestUpsertPartitioner");
+ initPath();
+ initMetaClient();
+ initTestDataGenerator();
+ initFileSystem();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cleanupSparkContexts();
+ cleanupMetaClient();
+ cleanupFileSystem();
+ cleanupTestDataGenerator();
+ }
+
+ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
+ String testPartitionPath, boolean autoSplitInserts) throws Exception {
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+ .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
+
+ HoodieClientTestUtils.fakeCommitFile(basePath, "001");
+ HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
+
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+ List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
+ List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", numUpdates);
+ for (HoodieRecord updateRec : updateRecords) {
+ updateRec.unseal();
+ updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
+ updateRec.seal();
+ }
+ List<HoodieRecord> records = new ArrayList<>();
+ records.addAll(insertRecords);
+ records.addAll(updateRecords);
+ WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
+ UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
+ assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition(
+ new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
+ return partitioner;
+ }
+
+ @Test
+ public void testUpsertPartitioner() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts + Updates... Check all updates go together & inserts subsplit
+ UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
+ List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+ assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
+ }
+
+ @Test
+ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
+ // smallest file
+ UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
+ List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
+ assertEquals("Bucket 0 is UPDATE", BucketType.UPDATE,
+ partitioner.getBucketInfo(0).bucketType);
+ assertEquals("Bucket 1 is INSERT", BucketType.INSERT,
+ partitioner.getBucketInfo(1).bucketType);
+ assertEquals("Bucket 2 is INSERT", BucketType.INSERT,
+ partitioner.getBucketInfo(2).bucketType);
+ assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
+ assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
+ assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
+
+ // Now with insert split size auto tuned
+ partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
+ insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals("Should have 4 partitions", 4, partitioner.numPartitions());
+ assertEquals("Bucket 0 is UPDATE", BucketType.UPDATE,
+ partitioner.getBucketInfo(0).bucketType);
+ assertEquals("Bucket 1 is INSERT", BucketType.INSERT,
+ partitioner.getBucketInfo(1).bucketType);
+ assertEquals("Bucket 2 is INSERT", BucketType.INSERT,
+ partitioner.getBucketInfo(2).bucketType);
+ assertEquals("Bucket 3 is INSERT", BucketType.INSERT,
+ partitioner.getBucketInfo(3).bucketType);
+ assertEquals("Total of 4 insert buckets", 4, insertBuckets.size());
+ assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
+ assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
+ }
+
+ private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception {
+ // Prepare the AvroParquetIO
+ String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
+ return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 868af20..23ba7f7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -69,4 +69,8 @@ public enum WriteOperationType {
throw new HoodieException("Invalid value of Type.");
}
}
+
+ public static boolean isChangingRecords(WriteOperationType operationType) {
+ return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE;
+ }
}
\ No newline at end of file