You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/28 12:49:12 UTC

[GitHub] [hudi] danny0405 opened a new pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

danny0405 opened a new pull request #2506:
URL: https://github.com/apache/hudi/pull/2506


   ## What is the purpose of the pull request
   
   This is the #step 2 of RFC-24:
   https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
   
   This PR introduces a BucketAssigner that assigns bucket ID (partition
   path & fileID) for each stream record.
   
   There is no need to look up index and partition the records anymore in
   the following pipeline for these records,
   we actually decide the write target location before the write and each
   record computes its location when the BucketAssigner receives it, thus,
   the indexing is with streaming style.
   
   Computing locations for a batch of records all at a time is resource
   consuming so a pressure to the engine,
   we should avoid that in streaming system.
   
   ## Brief change log
   
     - Add `BucketAssigner`
     - Modify the behavior of `HoodieFlinkWriteClient` and `BaseFlinkCommitActionExecutor`
   
   ## Verify this pull request
   
   Added tests.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773978170


   Force push to re-trigger the CI tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568370892



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       IMO, `transit` is a verb, while `transition ` is a noum.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));

Review comment:
       IMO, it would be better to encapsulate the logic of building `INFLIGHT` and `REQUEST` states into the inner of the `deletePending` method. Currently, the method name is abstract, while the argument is materialization. Otherwise, defining `deleteInflight` sounds better.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>

Review comment:
       What's the difference between `CreateHandleFactory` and `FlinkCreateHandleFactory `? It seems they are the same.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleInsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
     updateIndex(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+  protected void updateIndex(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.

Review comment:
       “No need”, but the method name is still `updateIndex`.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       Can we define another flag to host it or put it into another place? 

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       Can we define a context object e.g. `excutorContext` to put the information? IMO, extract general information from data is not a good design.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+      String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+  protected Iterator<List<WriteStatus>> handleInsertPartition(

Review comment:
       It seems `handleUpsertPartition ` is enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568607212



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       > See the document of `BucketAssignerFunction`.
   
   Did you confuse upsert with the update?
   the `tag` operation means to query the data in table to check whether the record is an insert or update, we can not mark all records to update when the operation is UPSERT? 
   Did I miss something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773942916


   @danny0405 Can you check CI?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773977179


   > @danny0405 Can you check CI?
   
   Yes, i checked the CI exception and it seems that the exception has no relationship with this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569410938



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignerFunction.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignerFunction<K, I, O extends HoodieRecord<?>>
+    extends KeyedProcessFunction<K, I, O>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private MapState<HoodieKey, HoodieRecordLocation> indexState;
+
+  private BucketAssigner bucketAssigner;
+
+  private final Configuration conf;
+
+  private final boolean isChangingRecords;
+
+  public BucketAssignerFunction(Configuration conf) {
+    this.conf = conf;
+    this.isChangingRecords = WriteOperationType.isChangingRecords(
+        WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkEngineContext context =
+        new HoodieFlinkEngineContext(
+            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+            new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(
+        context,
+        writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) {
+    this.bucketAssigner.reset();
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+        new MapStateDescriptor<>(
+            "indexState",
+            TypeInformation.of(HoodieKey.class),
+            TypeInformation.of(HoodieRecordLocation.class));
+    indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
+    // 1. put the record into the BucketAssigner;
+    // 2. look up the state for location, if the record has a location, just send it out;
+    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final HoodieKey hoodieKey = record.getKey();
+    final BucketInfo bucketInfo;
+    final HoodieRecordLocation location;
+    // Only changing records need looking up the index for the location,
+    // append only records are always recognized as INSERT.
+    if (isChangingRecords && this.indexState.contains(hoodieKey)) {

Review comment:
       Yes, you are right ~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568607212



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       > See the document of `BucketAssignerFunction`.
   
   Did you confuse upsert with the update?
   the `tag` operation means to query the data in table to check whether the record is an insert or update, we can not mark all records to update when the operation is UPSERT? 
   Did I miss something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567776880



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       No, `transition` is a verb.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-769764823


   @danny0405 ping us, when you are ready to review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569905107



##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
##########
@@ -58,22 +59,24 @@
 
   private static final Map<String, String> EXPECTED = new HashMap<>();

Review comment:
       `EXPECTED1`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {

Review comment:
       This method is called in many places in one flink job. Do we really call it many times?
   
   If yes, from sematics, rename it to be `initTableIfNotExist` or something like `initTableOnDemand`?

##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/BucketAssignerTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Test cases for {@link BucketAssigner}. */

Review comment:
       Align with the old style of Java's class doc in the project looks better.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -71,8 +71,7 @@
       + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

Review comment:
       Does this config cause any problem? AFAIK, it copied from deltastreamer.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.transform;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+
+/** Function that transforms RowData to HoodieRecord. */

Review comment:
       Align the comment style.

##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/BucketAssignerTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Test cases for {@link BucketAssigner}. */
+public class BucketAssignerTest {

Review comment:
       If it's a UT, let us follow the naming rule: `TestXXX`?

##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
##########
@@ -92,6 +92,19 @@
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_THREE = Arrays.asList(
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,

Review comment:
       All the same rows? Why not build them via a loop?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignerFunction.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignerFunction<K, I, O extends HoodieRecord<?>>

Review comment:
       `BucketAssignFunction ` sounds better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567513264



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
##########
@@ -26,9 +26,9 @@
  */
 public class InsertBucket implements Serializable {
 
-  int bucketNumber;
+  public int bucketNumber;

Review comment:
       ditto

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       @wangxianghu Do you agree this?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -68,6 +70,9 @@ public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
   }
 
   public static TypedProperties getProps(FlinkStreamerConfig cfg) {
+    if (cfg.propsFilePath.equals("")) {

Review comment:
       `String#isEmpty`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+    // Hadoop FileSystem
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient.initTableType(
+            hadoopConf,
+            basePath,
+            HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+            conf.getString(FlinkOptions.TABLE_NAME),
+            "archived",

Review comment:
       Extract it to be a constant.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       `transition` is a noum. `transform` sounds better? I know there is a method `HoodieActiveTimeline#transitionRequestedToInflight`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+    // Hadoop FileSystem
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient.initTableType(
+            hadoopConf,
+            basePath,
+            HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+            conf.getString(FlinkOptions.TABLE_NAME),
+            "archived",
+            conf.getString(FlinkOptions.PAYLOAD_CLASS),
+            1);
+        LOG.info("Table initialized");

Review comment:
       More information to be provided? e.g. `basePath`...?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
##########
@@ -19,15 +19,16 @@
 package org.apache.hudi.table.action.commit;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Helper class for a bucket's type (INSERT and UPDATE) and its file location.
  */
 public class BucketInfo implements Serializable {
 
-  BucketType bucketType;
-  String fileIdPrefix;
-  String partitionPath;
+  public BucketType bucketType;

Review comment:
       Can we use `getter/setter` pattern instead of using public fields?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);

Review comment:
       IMO, `FlinkOptions.BASE_PATH` is more readable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-774425358


   @garyli1019 Will land this PR towards tomorrow morning, any concerns please let us know!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568409884



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));

Review comment:
       IMO, it would be better to encapsulate the logic of building `INFLIGHT` and `REQUEST` states into the inner of the `deletePending` method. Currently, the method name is abstract, while the argument is materialization. Otherwise, defining `deleteInflight` sounds better.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>

Review comment:
       What's the difference between `CreateHandleFactory` and `FlinkCreateHandleFactory `? It seems they are the same.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleInsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
     updateIndex(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+  protected void updateIndex(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.

Review comment:
       “No need”, but the method name is still `updateIndex`.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       Can we define another flag to host it or put it into another place? 

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       Can we define a context object e.g. `excutorContext` to put the information? IMO, extract general information from data is not a good design.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+      String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+  protected Iterator<List<WriteStatus>> handleInsertPartition(

Review comment:
       It seems `handleUpsertPartition ` is enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua edited a comment on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua edited a comment on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773100796


   @garyli1019 @wangxianghu Any review input? Let us land this PR at the end time towards tomorrow night?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569539307



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       OK, so file a Jira issue to track this work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567769737



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       See the document of `BucketAssignerFunction`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773100796


   @garyli1019 @garyli1019 Any review input? Let us land this PR at the end time towards tomorrow night?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua edited a comment on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua edited a comment on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773100796


   @garyli1019 @wangxianghu Any review input? Let us land this PR at the end time towards tomorrow night?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569409952



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>

Review comment:
       I said the reason in the document, they are different in rolling over new files, `FlinkCreateHandleFactory` always uses the file handle name that we specified and never roll over with a number suffix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567774280



##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);

Review comment:
       I want to keep it sync with Spark data source, e.g. `org.apache.hudi.DefaultSource` line 62.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r570686747



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       nit: Can we add some comments about the indexing here and pointing to `BucketAssignerFunction`? This will be helpful for others to read the code. It's fine now, we can do this on your next PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569979918



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       Fired an issue to track this https://issues.apache.org/jira/browse/HUDI-1581

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       Fired an issue to track this https://issues.apache.org/jira/browse/HUDI-1581

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {

Review comment:
       Rename to `initTableIfNotExists`.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -71,8 +71,7 @@
       + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

Review comment:
       The path does not exists and it throws when fetch the properties, use empty string as the default to skip using empty properties too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-769390677






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569535044



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       Fine, let us stop arguing. After consulting the English dictionary, it seems that `transit` and `transition` have both the attributes of nouns and verbs, but they have slightly different meanings. Here I don’t want to entangle this focus anymore, let us spend our energy on more meaningful things. WDYT?
   
   +1 for staying the same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-773100796


   @garyli1019 @garyli1019 Any review input? Let us land this PR at the end time towards tomorrow night?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569412413



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       We can when we refactor the writer interface to take a specified bucket type, currently this is enough, it has the full functionality but does not bring in extra burden to the engine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569872885



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       File an issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568607212



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       > See the document of `BucketAssignerFunction`.
   
   Did you confuse upsert with the update?
   the `tag` operation means to query the data in table to check whether the record is an insert or update, we can not mark all records to update when the operation is UPSERT? 
   Did I miss something?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       > See the document of `BucketAssignerFunction`.
   
   Did you confuse upsert with the update?
   the `tag` operation means to query the data in table to check whether the record is an insert or update, we can not mark all records to update when the operation is UPSERT? 
   Did I miss something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-769390677


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=h1) Report
   > Merging [#2506](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=desc) (fb4e0f8) into [master](https://codecov.io/gh/apache/hudi/commit/bc0325f6ea0a734f106f21a2fcd4ead413a6cf7b?el=desc) (bc0325f) will **decrease** coverage by `40.57%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2506/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2506       +/-   ##
   ============================================
   - Coverage     50.26%   9.68%   -40.58%     
   + Complexity     3119      48     -3071     
   ============================================
     Files           430      53      -377     
     Lines         19565    1930    -17635     
     Branches       2004     230     -1774     
   ============================================
   - Hits           9835     187     -9648     
   + Misses         8925    1730     -7195     
   + Partials        805      13      -792     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [397 more](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569980444



##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {

Review comment:
       Rename to `initTableIfNotExists`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569419104



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleInsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
     updateIndex(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+  protected void updateIndex(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.

Review comment:
       Rename to `setUpWriteMetadata`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569408599



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       ` transit` is a noun.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua merged pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2506:
URL: https://github.com/apache/hudi/pull/2506


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569988398



##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -71,8 +71,7 @@
       + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

Review comment:
       The path does not exists and it throws when fetch the properties, use empty string as the default to skip using empty properties too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567765521



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");

Review comment:
       @danny0405 if we do not query the location, how to identify the operation per record is insert or update




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-769390677


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=h1) Report
   > Merging [#2506](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=desc) (ed3f2f8) into [master](https://codecov.io/gh/apache/hudi/commit/bc0325f6ea0a734f106f21a2fcd4ead413a6cf7b?el=desc) (bc0325f) will **decrease** coverage by `40.57%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2506/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2506       +/-   ##
   ============================================
   - Coverage     50.26%   9.68%   -40.58%     
   + Complexity     3119      48     -3071     
   ============================================
     Files           430      53      -377     
     Lines         19565    1930    -17635     
     Branches       2004     230     -1774     
   ============================================
   - Hits           9835     187     -9648     
   + Misses         8925    1730     -7195     
   + Partials        805      13      -792     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2506?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [397 more](https://codecov.io/gh/apache/hudi/pull/2506/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#issuecomment-770526630


   > @danny0405 ping us, when you are ready to review.
   
   @yanghua @garyli1019 Yes, this PR is ready to review, thanks for the review if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569099335



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignerFunction.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignerFunction<K, I, O extends HoodieRecord<?>>
+    extends KeyedProcessFunction<K, I, O>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private MapState<HoodieKey, HoodieRecordLocation> indexState;
+
+  private BucketAssigner bucketAssigner;
+
+  private final Configuration conf;
+
+  private final boolean isChangingRecords;
+
+  public BucketAssignerFunction(Configuration conf) {
+    this.conf = conf;
+    this.isChangingRecords = WriteOperationType.isChangingRecords(
+        WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkEngineContext context =
+        new HoodieFlinkEngineContext(
+            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+            new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(
+        context,
+        writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) {
+    this.bucketAssigner.reset();
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+        new MapStateDescriptor<>(
+            "indexState",
+            TypeInformation.of(HoodieKey.class),
+            TypeInformation.of(HoodieRecordLocation.class));
+    indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
+    // 1. put the record into the BucketAssigner;
+    // 2. look up the state for location, if the record has a location, just send it out;
+    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final HoodieKey hoodieKey = record.getKey();
+    final BucketInfo bucketInfo;
+    final HoodieRecordLocation location;
+    // Only changing records need looking up the index for the location,
+    // append only records are always recognized as INSERT.
+    if (isChangingRecords && this.indexState.contains(hoodieKey)) {

Review comment:
       IIUC the `tagLocation` was moved to here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569413446



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       We can, but this needs refactoring to the executor interface, it's sad that i have no time for that now ~,  we can fire an issue for a future work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569979918



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       Fired an issue to track this https://issues.apache.org/jira/browse/HUDI-1581




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568370892



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+    activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {

Review comment:
       IMO, `transit` is a verb, while `transition ` is a noum.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569980040



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       Fired an issue to track this https://issues.apache.org/jira/browse/HUDI-1581




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2506: [HUDI-1557] Make Flink write pipeline write task scalable

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569411072



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+      String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+  protected Iterator<List<WriteStatus>> handleInsertPartition(

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org