You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/26 04:22:19 UTC
[hudi] branch master updated: [MINOR] Private the NoArgsConstructor
of SparkMergeHelper and code clean (#2194)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e206ddd [MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean (#2194)
e206ddd is described below
commit e206ddd431131da26cf1bb00f70fe64ad0450059
Author: wangxianghu <wx...@126.com>
AuthorDate: Mon Oct 26 12:22:11 2020 +0800
[MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean (#2194)
---
.../hudi/table/action/commit/BaseSparkCommitActionExecutor.java | 9 +--------
.../action/commit/SparkInsertOverwriteCommitActionExecutor.java | 7 ++-----
.../org/apache/hudi/table/action/commit/SparkMergeHelper.java | 3 +++
3 files changed, 6 insertions(+), 13 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 36cca8c..ad62db9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -199,6 +199,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
commitOnAutoCommit(result);
}
+ @Override
protected String getCommitActionType() {
return table.getMetaClient().getCommitActionType();
}
@@ -276,14 +277,6 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
return handleUpdateInternal(upsertHandle, fileId);
}
- public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
- Map<String, HoodieRecord<T>> keyToNewRecords,
- HoodieBaseFile oldDataFile) throws IOException {
- // these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, keyToNewRecords, oldDataFile);
- return handleUpdateInternal(upsertHandle, fileId);
- }
-
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 627e75e..2771a22 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -28,8 +28,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
@@ -41,8 +39,6 @@ import java.util.stream.Collectors;
public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private static final Logger LOG = LogManager.getLogger(SparkInsertOverwriteCommitActionExecutor.class);
-
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
@@ -53,7 +49,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
- public HoodieWriteMetadata execute() {
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
@@ -68,6 +64,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
+ @Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
index 2d130e3..08d60b9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
@@ -46,6 +46,9 @@ import java.util.Iterator;
public class SparkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+ private SparkMergeHelper() {
+ }
+
private static class MergeHelperHolder {
private static final SparkMergeHelper SPARK_MERGE_HELPER = new SparkMergeHelper();
}