You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/25 06:27:29 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

yanghua commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r582448681



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -208,12 +210,32 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
 
   @Override
   public void commitCompaction(String compactionInstantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
-    throw new HoodieNotSupportedException("Compaction is not supported yet");
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+    HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata(
+        table, compactionInstantTime, writeStatuses, config.getSchema());
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
   }
 
   @Override
-  protected void completeCompaction(HoodieCommitMetadata metadata, List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {
-    throw new HoodieNotSupportedException("Compaction is not supported yet");
+  public void completeCompaction(HoodieCommitMetadata metadata, List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {

Review comment:
       This line is too long.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches).
+ *
+ * <p>For the first mini-batch, it initialize and set up the next file path to write,
+ * but does not close the file writer until all the mini-batches write finish. Each mini-batch
+ * data are appended to this handle, the back-up writer may rollover on condition.
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieAppendHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
+  private boolean needBootStrap = true;
+  // Total number of bytes written to file
+  private long sizeInBytes = 0;
+
+  public FlinkAppendHandle(
+      HoodieWriteConfig config,
+      String instantTime,
+      HoodieTable<T, I, K, O> hoodieTable,
+      String partitionPath,
+      String fileId,
+      Iterator<HoodieRecord<T>> recordItr,
+      TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier);
+  }
+
+  @Override
+  protected boolean needsUpdateLocation() {
+    return false;
+  }
+
+  @Override
+  protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
+    return hoodieRecord.getCurrentLocation() != null
+        && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
+  }
+
+  /**
+   * Returns whether there is need to bootstrap this file handle.
+   * E.G. the first time that the handle is created.
+   */
+  public boolean isNeedBootStrap() {
+    return this.needBootStrap;
+  }
+
+  /**
+   * Appends new records into this append handle.
+   * @param recordItr The new records iterator
+   */
+  public void appendNewRecords(Iterator<HoodieRecord<T>> recordItr) {
+    this.recordItr = recordItr;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    needBootStrap = false;
+    // flush any remaining records to disk
+    appendDataAndDeleteBlocks(header);
+    for (WriteStatus status: statuses) {
+      long logFileSize = 0;
+      try {
+        logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
+      } catch (IOException e) {
+        throw new HoodieUpsertException("Failed to get file size for append handle", e);
+      }

Review comment:
       Can we wrap this `try-catch`  block around the `for-loop`.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A flink implementation of {@link AbstractCompactHelpers}.
+ *
+ * @param <T>
+ */
+public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
+    AbstractCompactHelpers<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  private FlinkCompactHelpers() {
+  }
+
+  private static class CompactHelperHolder {
+    private static final FlinkCompactHelpers SPARK_COMPACT_HELPERS = new FlinkCompactHelpers();
+  }
+
+  public static FlinkCompactHelpers newInstance() {
+    return CompactHelperHolder.SPARK_COMPACT_HELPERS;

Review comment:
       ditto

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
+ * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
+ * a normal commit.
+ *
+ * <p>Note: the compaction logic is invoked through the flink pipeline.
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
+
+  // Accumulator to keep track of total log files for a table
+  private AtomicLong totalLogFiles;
+  // Accumulator to keep track of total log file slices for a table
+  private AtomicLong totalFileSlices;
+
+  @Override
+  public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+                                      HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
+    throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+        + "the function works as a separate pipeline");
+  }
+
+  public List<WriteStatus> compact(

Review comment:
       Use the unified indent strategy?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A flink implementation of {@link AbstractCompactHelpers}.
+ *
+ * @param <T>
+ */
+public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
+    AbstractCompactHelpers<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  private FlinkCompactHelpers() {
+  }
+
+  private static class CompactHelperHolder {
+    private static final FlinkCompactHelpers SPARK_COMPACT_HELPERS = new FlinkCompactHelpers();

Review comment:
       spark?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -244,6 +266,47 @@ public void cleanHandles() {
     this.bucketToHandles.clear();
   }
 
+  /**
+   * Get or create a new write handle in order to reuse the file handles.
+   *
+   * @param record        The first record in the bucket
+   * @param isDelta       Whether the table is in MOR mode
+   * @param config        Write config
+   * @param instantTime   The instant time
+   * @param table         The table
+   * @param partitionPath Partition path
+   * @param recordItr     Record iterator
+   * @return Existing write handle or create a new one
+   */
+  private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
+      HoodieRecord<T> record,
+      boolean isDelta,
+      HoodieWriteConfig config,
+      String instantTime,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+      String partitionPath,
+      Iterator<HoodieRecord<T>> recordItr) {
+    final HoodieRecordLocation loc = record.getCurrentLocation();
+    final String fileID = loc.getFileId();
+    final boolean isInsert = loc.getInstantTime().equals("I");
+    if (bucketToHandles.containsKey(fileID)) {
+      return bucketToHandles.get(fileID);
+    }
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+    if (isDelta) {

Review comment:
       The `if/else` is a different dimension. Can we judge `COW/MOR` firstly, then judge `I/U`. It's better for readability.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
+ * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
+ * a normal commit.
+ *
+ * <p>Note: the compaction logic is invoked through the flink pipeline.
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
+
+  // Accumulator to keep track of total log files for a table
+  private AtomicLong totalLogFiles;
+  // Accumulator to keep track of total log file slices for a table
+  private AtomicLong totalFileSlices;
+
+  @Override
+  public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+                                      HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
+    throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+        + "the function works as a separate pipeline");
+  }
+
+  public List<WriteStatus> compact(
+      HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable,
+      HoodieTableMetaClient metaClient,
+      HoodieWriteConfig config,
+      CompactionOperation operation,
+      String instantTime) throws IOException {
+    FileSystem fs = metaClient.getFs();
+
+    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+    LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+        + " for commit " + instantTime);
+    // TODO - FIX THIS
+    // Reads the entire avro file. Always only specific blocks should be read from the avro file
+    // (failure recover).
+    // Load all the delta commits since the last compaction commit and get all the blocks to be
+    // loaded and load it using CompositeAvroLogReader
+    // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
+    String maxInstantTime = metaClient
+        .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+    // TODO(danny): make it configurable
+    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config.getProps());
+    LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
+
+    List<String> logFiles = operation.getDeltaFileNames().stream().map(
+        p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
+        .collect(toList());
+    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(metaClient.getBasePath())
+        .withLogFilePaths(logFiles)
+        .withReaderSchema(readerSchema)
+        .withLatestInstantTime(maxInstantTime)
+        .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+        .withReverseReader(config.getCompactionReverseLogReadEnabled())
+        .withBufferSize(config.getMaxDFSStreamBufferSize())
+        .withSpillableMapBasePath(config.getSpillableMapBasePath())
+        .build();
+    if (!scanner.iterator().hasNext()) {
+      return new ArrayList<>();
+    }
+
+    Option<HoodieBaseFile> oldDataFileOpt =
+        operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
+
+    // Compacting is very similar to applying updates to existing file
+    Iterator<List<WriteStatus>> result;
+    // If the dataFile is present, perform updates else perform inserts into a new base file.
+    if (oldDataFileOpt.isPresent()) {
+      result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
+          operation.getFileId(), scanner.getRecords(),
+          oldDataFileOpt.get());
+    } else {
+      result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
+          scanner.getRecords());
+    }
+    Iterable<List<WriteStatus>> resultIterable = () -> result;
+    return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
+      s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+      s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+      s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
+      s.getStat().setPartitionPath(operation.getPartitionPath());
+      s.getStat()
+          .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+      s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
+      s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+      s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
+      RuntimeStats runtimeStats = new RuntimeStats();
+      runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
+      s.getStat().setRuntimeStats(runtimeStats);
+    }).collect(toList());
+  }
+
+  @Override
+  public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
+                                                     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable,
+                                                     HoodieWriteConfig config, String compactionCommitTime,
+                                                     Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
+      throws IOException {
+    totalLogFiles = new AtomicLong(0);
+    totalFileSlices = new AtomicLong(0);
+
+    ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
+        "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+            + hoodieTable.getMetaClient().getTableType().name());
+
+    // TODO : check if maxMemory is not greater than JVM or spark.executor memory

Review comment:
       spark?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
+ * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
+ * a normal commit.
+ *
+ * <p>Note: the compaction logic is invoked through the flink pipeline.
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
+
+  // Accumulator to keep track of total log files for a table
+  private AtomicLong totalLogFiles;
+  // Accumulator to keep track of total log file slices for a table
+  private AtomicLong totalFileSlices;
+
+  @Override
+  public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+                                      HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
+    throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+        + "the function works as a separate pipeline");
+  }
+
+  public List<WriteStatus> compact(
+      HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable,
+      HoodieTableMetaClient metaClient,
+      HoodieWriteConfig config,
+      CompactionOperation operation,
+      String instantTime) throws IOException {
+    FileSystem fs = metaClient.getFs();
+
+    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+    LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+        + " for commit " + instantTime);
+    // TODO - FIX THIS
+    // Reads the entire avro file. Always only specific blocks should be read from the avro file
+    // (failure recover).
+    // Load all the delta commits since the last compaction commit and get all the blocks to be
+    // loaded and load it using CompositeAvroLogReader
+    // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
+    String maxInstantTime = metaClient
+        .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+    // TODO(danny): make it configurable
+    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config.getProps());
+    LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
+
+    List<String> logFiles = operation.getDeltaFileNames().stream().map(
+        p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
+        .collect(toList());
+    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(metaClient.getBasePath())
+        .withLogFilePaths(logFiles)
+        .withReaderSchema(readerSchema)
+        .withLatestInstantTime(maxInstantTime)
+        .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+        .withReverseReader(config.getCompactionReverseLogReadEnabled())
+        .withBufferSize(config.getMaxDFSStreamBufferSize())
+        .withSpillableMapBasePath(config.getSpillableMapBasePath())
+        .build();
+    if (!scanner.iterator().hasNext()) {
+      return new ArrayList<>();
+    }
+
+    Option<HoodieBaseFile> oldDataFileOpt =
+        operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
+
+    // Compacting is very similar to applying updates to existing file
+    Iterator<List<WriteStatus>> result;
+    // If the dataFile is present, perform updates else perform inserts into a new base file.
+    if (oldDataFileOpt.isPresent()) {
+      result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
+          operation.getFileId(), scanner.getRecords(),
+          oldDataFileOpt.get());
+    } else {
+      result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
+          scanner.getRecords());
+    }
+    Iterable<List<WriteStatus>> resultIterable = () -> result;
+    return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
+      s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+      s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+      s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
+      s.getStat().setPartitionPath(operation.getPartitionPath());
+      s.getStat()
+          .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+      s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
+      s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+      s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
+      RuntimeStats runtimeStats = new RuntimeStats();
+      runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
+      s.getStat().setRuntimeStats(runtimeStats);
+    }).collect(toList());
+  }
+
+  @Override
+  public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
+                                                     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable,
+                                                     HoodieWriteConfig config, String compactionCommitTime,
+                                                     Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
+      throws IOException {
+    totalLogFiles = new AtomicLong(0);
+    totalFileSlices = new AtomicLong(0);
+
+    ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
+        "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+            + hoodieTable.getMetaClient().getTableType().name());
+
+    // TODO : check if maxMemory is not greater than JVM or spark.executor memory
+    // TODO - rollback any compactions in flight
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
+
+    // filter the partition paths if needed to reduce list status
+    partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
+
+    if (partitionPaths.isEmpty()) {
+      // In case no partitions could be picked, return no compaction plan
+      return null;
+    }
+
+    SliceView fileSystemView = hoodieTable.getSliceView();
+    LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
+    context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
+
+    List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
+        .getLatestFileSlices(partitionPath)
+        .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
+        .map(s -> {
+          List<HoodieLogFile> logFiles =
+              s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+          totalLogFiles.addAndGet(logFiles.size());
+          totalFileSlices.addAndGet(1L);
+          // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
+          // for spark Map operations and collecting them finally in Avro generated classes for storing

Review comment:
       spark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org