You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/08 09:46:44 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #2553: [HUDI-1598] Write as minor batches during one checkpoint interval for…

yanghua commented on a change in pull request #2553:
URL: https://github.com/apache/hudi/pull/2553#discussion_r571877484



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
##########
@@ -0,0 +1,110 @@
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+/**
+ * HoodieTable that need to pass in the
+ * {@link org.apache.hudi.io.HoodieWriteHandle} explicitly.
+ */
+public interface ExplicitWriteHandleTable<T extends HoodieRecordPayload> {

Review comment:
       Can we merge the methods of this interface into `HoodieFlinkTable`?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to write,
+ * and closes the file path when the mini-batch write finish. When next mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches write finish
+ * for a checkpoint round, it renames the last new file path as the desired file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
+
+  /**
+   * Records the current file handles number that rolls over.
+   */
+  private int rollNumber = 0;
+  /**
+   * Records the rolled over file paths.
+   */
+  private List<Path> rolloverPaths;
+  /**
+   * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet.
+   */
+  private boolean needBootStrap = true;
+
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                          TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
+    rolloverPaths = new ArrayList<>();
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+                          HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  protected String dataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension());
+  }
+
+  public boolean isNeedBootStrap() {
+    return needBootStrap;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatus = super.close();
+    this.needBootStrap = false;
+    return writeStatus;
+  }
+
+  /**
+   * THe difference with the parent method is that there is no need to set up
+   * locations for the records.
+   *
+   * @param fileId        The file ID
+   * @param newRecordsItr The incremental records iterator
+   */
+  @Override
+  protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+    try {
+      // Load the new records in a map
+      long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
+      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
+    } catch (IOException io) {
+      throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
+    }
+    while (newRecordsItr.hasNext()) {
+      HoodieRecord<T> record = newRecordsItr.next();
+      // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
+      keyToNewRecords.put(record.getRecordKey(), record);
+    }
+    LOG.info("Number of entries in MemoryBasedMap => "

Review comment:
       Refactor via `String.format()` ?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
##########
@@ -252,6 +270,41 @@ private void initWriteFunction() {
     }
   }
 
+  /**
+   * Tool to detect if to flush out the existing buffer.
+   * Sampling the record to compute the size with 0.01 percentage.
+   */
+  private static class BufferSizeDetector {
+    private final Random random = new Random(47);
+
+    private final double batchSizeBytes;
+
+    private long lastRecordSize = -1L;
+    private long totalSize = 0L;
+
+    BufferSizeDetector(double batchSizeMb) {
+      this.batchSizeBytes = batchSizeMb * 1024 * 1024;

Review comment:
       We'd better do some legality verification, considering the config option does not hint unit.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
##########
@@ -252,6 +270,41 @@ private void initWriteFunction() {
     }
   }
 
+  /**
+   * Tool to detect if to flush out the existing buffer.
+   * Sampling the record to compute the size with 0.01 percentage.
+   */
+  private static class BufferSizeDetector {
+    private final Random random = new Random(47);
+
+    private final double batchSizeBytes;
+
+    private long lastRecordSize = -1L;
+    private long totalSize = 0L;
+
+    BufferSizeDetector(double batchSizeMb) {
+      this.batchSizeBytes = batchSizeMb * 1024 * 1024;
+    }
+
+    boolean detect(Object record) {
+      if (lastRecordSize == -1 || sampling()) {
+        lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
+      }
+      totalSize += lastRecordSize;
+      return totalSize > this.batchSizeBytes;
+    }
+
+    boolean sampling() {
+      // 0.01 sampling percentage
+      return random.nextInt(100) == 1;

Review comment:
       Can we extract it into a constant field?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -132,6 +132,13 @@ public Schema getWriterSchema() {
     return writerSchema;
   }
 
+  /**
+   * Returns the data file name.
+   */
+  protected String dataFileName() {

Review comment:
       `generateDataFileName` sounds better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
##########
@@ -153,6 +152,14 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
   public void checkpointComplete(long checkpointId) {
     // start to commit the instant.
     checkAndCommitWithRetry();
+    // start new instant.
+    startInstant();
+  }
+
+  private void startInstant() {
+    this.inFlightInstant = this.writeClient.startCommit();
+    this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
+    LOG.info("Create instant [{}]", this.inFlightInstant);

Review comment:
       Provide more information?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
##########
@@ -0,0 +1,10 @@
+package org.apache.hudi.io;
+
+/** Hoodie write handle that supports write as mini-batch. */

Review comment:
       Use unified java style?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to write,
+ * and closes the file path when the mini-batch write finish. When next mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches write finish
+ * for a checkpoint round, it renames the last new file path as the desired file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
+
+  /**
+   * Records the current file handles number that rolls over.
+   */
+  private int rollNumber = 0;
+  /**
+   * Records the rolled over file paths.
+   */
+  private List<Path> rolloverPaths;
+  /**
+   * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet.
+   */
+  private boolean needBootStrap = true;
+
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                          TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
+    rolloverPaths = new ArrayList<>();
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+                          HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  protected String dataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension());
+  }
+
+  public boolean isNeedBootStrap() {
+    return needBootStrap;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatus = super.close();
+    this.needBootStrap = false;
+    return writeStatus;
+  }
+
+  /**
+   * THe difference with the parent method is that there is no need to set up

Review comment:
       `The`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org