You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/09 04:40:09 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #4446: [HUDI-2917] rollback insert data appended to log file when using Hbase Index

danny0405 commented on a change in pull request #4446:
URL: https://github.com/apache/hudi/pull/4446#discussion_r780733934



##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
+    WorkloadProfile inputProfile = null;
     if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
+      inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+      LOG.info("Input workload profile :" + inputProfile);
+    }
+
+    final Partitioner partitioner = getPartitioner(inputProfile);
+    try {
+      WorkloadProfile executionProfile = partitioner.getExecutionWorkloadProfile();
+      LOG.info("Execution workload profile :" + inputProfile);
+      saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);

Review comment:
       Any why we must use the execution profile here ? I know the original profile also works only for bloomfilter index but we should fix the profile building instead of fetch it from the partitioner, if we have a way to distinguish between  `INSERT`s and `UPDATE`s before write.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
         .build();
-    if (!scanner.iterator().hasNext()) {
-      scanner.close();
-      return new ArrayList<>();
-    }
 
     Option<HoodieBaseFile> oldDataFileOpt =
         operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
 
+    // Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner.
+    // But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice.
+    if (!scanner.iterator().hasNext()) {
+      if (!oldDataFileOpt.isPresent()) {
+        scanner.close();
+        return new ArrayList<>();
+      } else {
+        // TODO: we may directly rename original parquet file if there is not evolution/devolution of schema

Review comment:
       If the file slice only has parquet files, why we still trigger compaction ?

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
+    WorkloadProfile inputProfile = null;
     if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
+      inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+      LOG.info("Input workload profile :" + inputProfile);
+    }
+
+    final Partitioner partitioner = getPartitioner(inputProfile);
+    try {
+      WorkloadProfile executionProfile = partitioner.getExecutionWorkloadProfile();
+      LOG.info("Execution workload profile :" + inputProfile);
+      saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);

Review comment:
       Did you mean `executionProfile` ?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
##########
@@ -18,10 +18,14 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.table.WorkloadProfile;
+
 import java.io.Serializable;
 
 public interface Partitioner extends Serializable {
   int getNumPartitions();
 
   int getPartition(Object key);
+
+  WorkloadProfile getExecutionWorkloadProfile();
 }

Review comment:
       Why a `Partitioner` returns the profile ? Let's not put the interface here.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -97,15 +98,25 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String insta
         insertStat.setFileId("");
         insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
         metadata.addWriteStat(path, insertStat);
-
-        partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
-          HoodieWriteStat writeStat = new HoodieWriteStat();
-          writeStat.setFileId(key);
-          // TODO : Write baseCommitTime is possible here ?
-          writeStat.setPrevCommit(value.getKey());
-          writeStat.setNumUpdateWrites(value.getValue());
-          metadata.addWriteStat(path, writeStat);
-        });
+        Map<String, Pair<String, Long>> updateLocationMap = partitionStat.getUpdateLocationToCount();

Review comment:
       Can we just write the code block twice instead of union the stream together, the code looks like a mess.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org