You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/10/04 01:17:14 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3741: [HUDI-2501] Add HoodieData abstraction and refactor compaction actions in hudi-client module

nsivabalan commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r720976899



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
##########
@@ -19,65 +19,65 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.io.IOException;
 import java.util.List;
 
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> extends
-    BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> {
+public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends
+    BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
+
+  private final AbstractHoodieWriteClient writeClient;
+  private final HoodieCompactor compactor;
+  private final HoodieCopyOnWriteTableOperation copyOnWriteTableOperation;
 
-  public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context,
-                                          HoodieWriteConfig config,
-                                          HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                          String instantTime) {
+  public RunCompactionActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable table,
+                                     String instantTime,
+                                     AbstractHoodieWriteClient writeClient,
+                                     HoodieCompactor compactor,
+                                     HoodieCopyOnWriteTableOperation copyOnWriteTableOperation) {
     super(context, config, table, instantTime);
+    this.writeClient = writeClient;
+    this.compactor = compactor;
+    this.copyOnWriteTableOperation = copyOnWriteTableOperation;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
-    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
-    if (!pendingCompactionTimeline.containsInstant(instant)) {
-      throw new IllegalStateException(
-          "No Compaction request available at " + instantTime + " to run compaction");
-    }
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    compactor.checkCompactionTimeline(table, instantTime, writeClient);
 
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>();
     try {
-      HoodieActiveTimeline timeline = table.getActiveTimeline();
+      // generate compaction plan
+      // should support configurable commit metadata
       HoodieCompactionPlan compactionPlan =
           CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
-      // Mark instant as compaction inflight
-      timeline.transitionCompactionRequestedToInflight(instant);
-      table.getMetaClient().reloadActiveTimeline();
 
-      HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor();
-      JavaRDD<WriteStatus> statuses = compactor.compact(context, compactionPlan, table, config, instantTime);
+      HoodieData<WriteStatus> statuses = compactor.compact(
+          context, compactionPlan, table, config, instantTime, copyOnWriteTableOperation);
 
-      statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+      statuses.persist(config.getProps());

Review comment:
       Can you point me to this code in flink code base prior to this patch ? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
##########
@@ -38,11 +44,36 @@
  * @param <K> Type of keys
  * @param <O> Type of outputs
  */
-public abstract class AbstractCompactHelpers<T extends HoodieRecordPayload, I, K, O> {
-  public abstract HoodieCommitMetadata createCompactionMetadata(HoodieTable<T, I, K, O> table,
-                                                                String compactionInstantTime,
-                                                                O writeStatuses,
-                                                                String schema) throws IOException;
+public class CompactHelpers<T extends HoodieRecordPayload, I, K, O> {
+
+  protected CompactHelpers() {
+  }
+
+  private static class CompactHelperHolder {

Review comment:
       why do we need this new internal class. Can't we declare a static final instance variable? This is classic singleton instance lazy instantiation IIUC.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,258 @@
 
 package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
 
 /**
  * A HoodieCompactor runs compaction on a hoodie table.
  */
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O> extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
+
+  public abstract Schema getReaderSchema(HoodieWriteConfig config);
+
+  public abstract void updateReaderSchema(HoodieWriteConfig config, HoodieTableMetaClient metaClient);
+
+  public abstract void checkCompactionTimeline(

Review comment:
       may be we can name this as validatePendingCompaction and take out the first two lines outside of the method. 
   ```
    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
       HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
   validatePendingCompaction(pendingCompactionTimeline, instant);
   ```

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,258 @@
 
 package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
 
 /**
  * A HoodieCompactor runs compaction on a hoodie table.
  */
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O> extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
+
+  public abstract Schema getReaderSchema(HoodieWriteConfig config);

Review comment:
       can we add java docs for these methods. since these will have to be implemented by every engine, will be good to have docs on what is expected out of these methods.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -75,164 +42,27 @@
  * <p>Note: the compaction logic is invoked through the flink pipeline.
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
-
-  // Accumulator to keep track of total log files for a table
-  private AtomicLong totalLogFiles;
-  // Accumulator to keep track of total log file slices for a table
-  private AtomicLong totalFileSlices;
+public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>

Review comment:
       class looks pretty slim. thanks for the refactoring :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org