You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Jianhui Dong (Jira)" <ji...@apache.org> on 2023/04/03 08:00:00 UTC

[jira] [Created] (HUDI-6022) The method param `instantTime` of org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor#handleUpsertPartition is redundant

Jianhui Dong created HUDI-6022:
----------------------------------

             Summary: The method param `instantTime` of org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor#handleUpsertPartition is redundant
                 Key: HUDI-6022
                 URL: https://issues.apache.org/jira/browse/HUDI-6022
             Project: Apache Hudi
          Issue Type: Improvement
            Reporter: Jianhui Dong


We have stored the `instantTime` in the superclass BaseActionExector, and there's no need to keep a method param 'instantTime`, it's preferred to remove it to make code cleaner.
{code:java}
protected Iterator<List<WriteStatus>> handleUpsertPartition(
    String instantTime,
    String partitionPath,
    String fileIdHint,
    BucketType bucketType,
    Iterator recordItr) {
  try {
    if (this.writeHandle instanceof HoodieCreateHandle) {
      // During one checkpoint interval, an insert record could also be updated,
      // for example, for an operation sequence of a record:
      //    I, U,   | U, U
      // - batch1 - | - batch2 -
      // the first batch(batch1) operation triggers an INSERT bucket,
      // the second batch batch2 tries to reuse the same bucket
      // and append instead of UPDATE.
      return handleInsert(fileIdHint, recordItr);
    } else if (this.writeHandle instanceof HoodieMergeHandle) {
      return handleUpdate(partitionPath, fileIdHint, recordItr);
    } else {
      switch (bucketType) {
        case INSERT:
          return handleInsert(fileIdHint, recordItr);
        case UPDATE:
          return handleUpdate(partitionPath, fileIdHint, recordItr);
        default:
          throw new AssertionError();
      }
    }
  } catch (Throwable t) {
    String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
    LOG.error(msg, t);
    throw new HoodieUpsertException(msg, t);
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)