You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/12 07:12:05 UTC

[GitHub] [hudi] loukey-lj opened a new pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

loukey-lj opened a new pull request #2434:
URL: https://github.com/apache/hudi/pull/2434


    InstantGenerateOperator support multiple parallelism.
   When InstantGenerateOperator subtask size greater than 1 we can set subtask 0 as a main subtask, only main task create new instant.
   The prerequisite of create new instant is exist subtask received data in current checkpoint. Every subtask will create a tmp file,
   flie name is make up by checkpointid,subtask index and received records size. 
   The main subtask will check every subtask file and parse file to make sure is shuold to create new instant. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (2c97612) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [327 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555724094



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -420,6 +421,12 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
       }
     }
 
+    // Always create instantGenerateFolder which is needed for InstantGenerateOperator
+    final Path instantGenerateFolder = new Path(basePath, HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME);

Review comment:
       > @wangxianghu Do we have a better place to put this change? It's common package.
   
   How about `StreamerUtil` ? this operation is only used in `HoodieFlinkStreamer` WDYT @loukey-lj 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555797905



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -71,6 +71,7 @@
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  public static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate";

Review comment:
       moved to InstantGenerateOperator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r559543672



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantMarkerFileName = String.format("%d_%d_%d", indexOfThisSubtask, checkpointId, batchSize.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant
+      if (receivedDataInCurrentCP) {
+        latestInstant = startNewInstant(checkpointId);
+      }
     }
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws Exception {
-    // instantState
-    ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
-    latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
-    // recordState
-    ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
-    recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
-    if (context.isRestored()) {
-      Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
-      latestInstantIterator.forEachRemaining(x -> latestInstant = x);
-      LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
-      Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
-      bufferedRecords.clear();
-      recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+    if (isMain) {
+      // instantState
+      ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
+      latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+      if (context.isRestored()) {
+        Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+        latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+        LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
+      }
     }
   }
 
   @Override
   public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
-    if (latestInstantList.isEmpty()) {
-      latestInstantList.add(latestInstant);
+    long checkpointId = functionSnapshotContext.getCheckpointId();
+    if (isMain) {
+      LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, batchSize, checkpointId);
+      if (latestInstantList.isEmpty()) {
+        latestInstantList.add(latestInstant);
+      } else {
+        latestInstantList.set(0, latestInstant);
+      }
+      latestInstantState.update(latestInstantList);
     } else {
-      latestInstantList.set(0, latestInstant);
+      LOG.info("Records size [{}] checkpointId [{}]", batchSize, checkpointId);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556286285



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,22 +72,25 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0L;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;

Review comment:
       called in open and  initializeState method,  initializeState method start before open method . So I removed the code in open method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (ed0b594) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [328 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (53d9942) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [327 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (cd1909f) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [328 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555703829



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -71,6 +71,7 @@
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  public static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate";

Review comment:
       Renaming to `instant_generate_tmp` looks better?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       unnecessary change, please revert?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;

Review comment:
       `realGenerator`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;

Review comment:
       Let's use `String.format()`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);

Review comment:
       ditto?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;

Review comment:
       `recordCounter`?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {
 
   /**
    * Return raw file-system.
-   * 

Review comment:
       ditto

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -420,6 +421,12 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
       }
     }
 
+    // Always create instantGenerateFolder which is needed for InstantGenerateOperator
+    final Path instantGenerateFolder = new Path(basePath, HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME);

Review comment:
       @wangxianghu Do we have a better place to put this change? It's common package.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -18,6 +18,18 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.flink.api.common.state.ListState;

Review comment:
       Let revert unnecessary changes? Pay attention to the import order.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;
+    Path path = new Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);

Review comment:
       `receivedDataInCurrentCP`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);

Review comment:
       We may need to verify the config opinion to see if it's valuable?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";

Review comment:
       `DELIMITER` sounds better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;
+    Path path = new Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
+
+      // no data no new instant
+      if (hasData) {
+        latestInstant = startNewInstant(checkpointId);
+      }
+    }
+
+  }
+
+  private boolean generateFilePasre(long checkpointId) throws InterruptedException, IOException {

Review comment:
       `checkReceivedData`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556285084



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]",checkpointId);
+        tryTimes = 0;
+      }
+    }
+
+    boolean hasData = false;
+    // judge whether has data in this checkpoint and delete tmp file.
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      // has data
+      if (Long.parseLong(name.split(UNDERLINE)[2]) > 0) {
+        hasData = true;
+        break;
+      }
+    }
+
+    // delete all tmp file
+    fileStatuses = fs.listStatus(generatePath);
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath());
+    }
+
+    return hasData;
+  }
+
+  private void createinstantGenerateTmpFolder() throws IOException {

Review comment:
       ok

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]",checkpointId);
+        tryTimes = 0;
+      }
+    }
+
+    boolean hasData = false;

Review comment:
       ok

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,73 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder
+      createinstantGenerateTmpFolder();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = String.format("%d_%d_%d",indexOfThisSubtask,checkpointId,batchSize);
+    Path path = new Path(INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant
+      if (receivedDataInCurrentCP) {
+        latestInstant = startNewInstant(checkpointId);
+      }
     }
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws Exception {
-    // instantState
-    ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
-    latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
-    // recordState
-    ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
-    recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
-    if (context.isRestored()) {
-      Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
-      latestInstantIterator.forEachRemaining(x -> latestInstant = x);
-      LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
-      Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
-      bufferedRecords.clear();
-      recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+    if (isMain) {
+      // instantState
+      ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
+      latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+      if (context.isRestored()) {
+        Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+        latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+        LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557472212



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +237,60 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_", checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]", checkpointId);
+        tryTimes = 0;
+      }
+      tryTimes++;
+    }
+
+    boolean receivedData = false;
+    // judge whether has data in this checkpoint and delete maker file.
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      // has data
+      if (Long.parseLong(name.split(UNDERLINE)[2]) > 0) {
+        receivedData = true;
+        break;
+      }
+    }
+
+    // delete all marker file
+    fileStatuses = fs.listStatus(instantMarkerPath);
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath(), true);
+    }
+
+    return receivedData;
+  }
+
+  private void createInstantMarkerDir() throws IOException {
+    // Always create instantMarkerFolder which is needed for InstantGenerateOperator
+    final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);

Review comment:
       `Paths.get()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557347825



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +72,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";

Review comment:
       sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua merged pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2434:
URL: https://github.com/apache/hudi/pull/2434


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555808114



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;
+    Path path = new Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);

Review comment:
       agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-764820382


   @yanghua should be good to land now if you are happy with it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557820490



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +105,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder

Review comment:
       changed to  'create instant marker directory'

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +236,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_", checkpointId));

Review comment:
       ok

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +236,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);

Review comment:
       agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-765047901


   > @yanghua should be good to land now if you are happy with it
   
   ack, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555734808



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -71,6 +71,7 @@
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  public static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate";

Review comment:
       move it to `hudi-flink` module would be better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj closed pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj closed pull request #2434:
URL: https://github.com/apache/hudi/pull/2434


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-762579586


   > @loukey-lj Thanks for your effort! LGTM now. @wangxianghu Do you still have any concerns?
   Nothing else, thanks @loukey-lj @yanghua 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-765047901


   > @yanghua should be good to land now if you are happy with it
   
   ack, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua merged pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2434:
URL: https://github.com/apache/hudi/pull/2434


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556537098



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       So, use `git diff` command to check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556192852



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       Did not change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-763669861


   Hi @vinothchandar Can we merge this PR, right now? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555808215



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;
+    Path path = new Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
+
+      // no data no new instant
+      if (hasData) {
+        latestInstant = startNewInstant(checkpointId);
+      }
+    }
+
+  }
+
+  private boolean generateFilePasre(long checkpointId) throws InterruptedException, IOException {

Review comment:
       agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-763784997


   @yanghua please wait till I make the release branch. We also need to stabilize CI before landing anymore. it will be very hard to track down otherwise.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557021821



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +72,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";

Review comment:
       `INSTANT_GENERATE_FOLDER_NAME = “.instant_marker”` ? 
   WDYT ? @yanghua 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555798148



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557021821



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +72,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";

Review comment:
       `INSTANT_GENERATE_FOLDER_NAME = ".instant_marker"` ? 
   WDYT ? @yanghua 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557462779



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +73,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
+  private transient boolean isMain = false;
+  private transient AtomicLong batchSize = new AtomicLong(0);

Review comment:
       I still think `cpRecordCounter` sounds better.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantMarkerFileName = String.format("%d_%d_%d", indexOfThisSubtask, checkpointId, batchSize.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);

Review comment:
       Using `Paths.get(string, string, ...)` looks better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantMarkerFileName = String.format("%d_%d_%d", indexOfThisSubtask, checkpointId, batchSize.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant
+      if (receivedDataInCurrentCP) {
+        latestInstant = startNewInstant(checkpointId);
+      }
     }
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws Exception {
-    // instantState
-    ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
-    latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
-    // recordState
-    ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
-    recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
-    if (context.isRestored()) {
-      Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
-      latestInstantIterator.forEachRemaining(x -> latestInstant = x);
-      LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
-      Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
-      bufferedRecords.clear();
-      recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+    if (isMain) {
+      // instantState
+      ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
+      latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+      if (context.isRestored()) {
+        Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+        latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+        LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
+      }
     }
   }
 
   @Override
   public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
-    if (latestInstantList.isEmpty()) {
-      latestInstantList.add(latestInstant);
+    long checkpointId = functionSnapshotContext.getCheckpointId();
+    if (isMain) {
+      LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, batchSize, checkpointId);
+      if (latestInstantList.isEmpty()) {
+        latestInstantList.add(latestInstant);
+      } else {
+        latestInstantList.set(0, latestInstant);
+      }
+      latestInstantState.update(latestInstantList);
     } else {
-      latestInstantList.set(0, latestInstant);
+      LOG.info("Records size [{}] checkpointId [{}]", batchSize, checkpointId);

Review comment:
       `Task instance %d received %d records in checkpoint [%d]` looks better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +105,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);

Review comment:
       wdyt about this review suggestion?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +236,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_", checkpointId));

Review comment:
       Can this suggestion be accepted?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       Still exist, you can add a whitespace after `*`, it may solve the issue.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {
 
   /**
    * Return raw file-system.
-   * 

Review comment:
       Or just revert this file's change via git command.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +237,60 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_", checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]", checkpointId);
+        tryTimes = 0;
+      }
+      tryTimes++;
+    }
+
+    boolean receivedData = false;
+    // judge whether has data in this checkpoint and delete maker file.
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      // has data
+      if (Long.parseLong(name.split(UNDERLINE)[2]) > 0) {
+        receivedData = true;
+        break;
+      }
+    }
+
+    // delete all marker file
+    fileStatuses = fs.listStatus(instantMarkerPath);
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath(), true);
+    }
+
+    return receivedData;
+  }
+
+  private void createInstantMarkerDir() throws IOException {
+    // Always create instantMarkerFolder which is needed for InstantGenerateOperator
+    final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);

Review comment:
       `Paths.get()`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {

Review comment:
       ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557465318



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantMarkerFileName = String.format("%d_%d_%d", indexOfThisSubtask, checkpointId, batchSize.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);

Review comment:
       Using `Paths.get(string, string, ...)` looks better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555807220



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);

Review comment:
       config have default value

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);

Review comment:
       config have default value




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556194645



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,73 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder
+      createinstantGenerateTmpFolder();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = String.format("%d_%d_%d",indexOfThisSubtask,checkpointId,batchSize);

Review comment:
       Many lines have the same issue, please change the code style like this: `xxx,xxx` -> `xxx, xxx`.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]",checkpointId);
+        tryTimes = 0;
+      }
+    }
+
+    boolean hasData = false;
+    // judge whether has data in this checkpoint and delete tmp file.
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      // has data
+      if (Long.parseLong(name.split(UNDERLINE)[2]) > 0) {
+        hasData = true;
+        break;
+      }
+    }
+
+    // delete all tmp file
+    fileStatuses = fs.listStatus(generatePath);
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath());
+    }
+
+    return hasData;
+  }
+
+  private void createinstantGenerateTmpFolder() throws IOException {

Review comment:
       -> `createInstantGenerateTmpDir`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]",checkpointId);
+        tryTimes = 0;
+      }
+    }
+
+    boolean hasData = false;

Review comment:
       -> `receivedData`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,22 +72,25 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0L;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;

Review comment:
       Wet call `getRuntimeContext().getIndexOfThisSubtask()`  several times, can we define a variable to store it?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,73 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder
+      createinstantGenerateTmpFolder();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = String.format("%d_%d_%d",indexOfThisSubtask,checkpointId,batchSize);
+    Path path = new Path(INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant
+      if (receivedDataInCurrentCP) {
+        latestInstant = startNewInstant(checkpointId);
+      }
     }
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws Exception {
-    // instantState
-    ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
-    latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
-    // recordState
-    ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
-    recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
-    if (context.isRestored()) {
-      Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
-      latestInstantIterator.forEachRemaining(x -> latestInstant = x);
-      LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
-      Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
-      bufferedRecords.clear();
-      recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+    if (isMain) {
+      // instantState
+      ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
+      latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+      if (context.isRestored()) {
+        Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+        latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+        LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);

Review comment:
       ` Restoring the latest instant [{}] from the state.` sounds better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556193603



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {

Review comment:
       Let's make it configurable?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r559543409



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask

Review comment:
       done

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +243,60 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]", checkpointId);

Review comment:
       done

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);

Review comment:
       done

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758500356


   reopen with new changes


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-764820382


   @yanghua should be good to land now if you are happy with it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557818230



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {

Review comment:
       yes I also found this bug it was fixed yesterday, tryTimes++;




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557821706



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +73,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
+  private transient boolean isMain = false;
+  private transient AtomicLong batchSize = new AtomicLong(0);

Review comment:
       agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r558324652



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +105,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556975899



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +105,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);

Review comment:
       IMO, here we can set the argument of FlinkTaskContextSupplier to the instance of `RuntimeContext`, right?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +105,76 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder

Review comment:
       About the comment, we'd better avoid copying the method name, `create temp folder for generating instant` seems better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +236,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_", checkpointId));

Review comment:
       You have defined the separator by `UNDERLINE`, it would be better to use that. wdyt?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +234,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create generate file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(generatePath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("_%d_",checkpointId));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {

Review comment:
       Did you check the logic, how to make the `tryTimes` up to 5?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r559626692



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +242,60 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting marker file, checkpointId [{}]", checkpointId);
+        tryTimes = 0;

Review comment:
       Is there a bug? when `tryTimes >= 5`, only reinit it to 0? Do not break? If the logic is correct, why we need this counter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556284970



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +106,73 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instantGenerateTmpFolder
+      createinstantGenerateTmpFolder();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = String.format("%d_%d_%d",indexOfThisSubtask,checkpointId,batchSize);

Review comment:
       formated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r559070413



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);

Review comment:
       `generate` -> `marker`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +243,60 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    int tryTimes = 1;
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+
+      if (tryTimes >= 5) {
+        LOG.warn("waiting generate file, checkpointId [{}]", checkpointId);

Review comment:
       `generate` -> `marker`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+    if (isMain) {
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
 
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+      // no data no new instant

Review comment:
       IMO, we can move `boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);` to this line. This way we can delay calling the method.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -102,65 +109,79 @@ public void open() throws Exception {
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk generate file by each subtask

Review comment:
       `generate` -> `marker`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555807899



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;

Review comment:
       agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557020337



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -222,4 +236,59 @@ public void close() throws Exception {
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses = null;
+    Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);

Review comment:
       @loukey-lj how about moving this path to `.hoodie/.aux` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555808717



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {
 
   /**
    * Return raw file-system.
-   * 

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (cd1909f) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [328 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555806718



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -18,6 +18,18 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.flink.api.common.state.ListState;

Review comment:
       reverted 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r557021821



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,16 +72,18 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";

Review comment:
       `INSTANT_GENERATE_FOLDER_NAME=.instant_marker` ? 
   WDYT ? @yanghua 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] loukey-lj commented on a change in pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556284611



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       my local HoodieTableMetaClient  compared with hudi master code  found no changes. a little bit weird




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2434: [HUDI-1511] InstantGenerateOperator support multiple parallelism

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#issuecomment-758857465


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=h1) Report
   > Merging [#2434](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=desc) (b60989d) into [master](https://codecov.io/gh/apache/hudi/commit/56866a11fe8b7a0ef8340f221da30c83c72b85da?el=desc) (56866a1) will **decrease** coverage by `43.87%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2434/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2434       +/-   ##
   ============================================
   - Coverage     53.56%   9.68%   -43.88%     
   + Complexity     2774      48     -2726     
   ============================================
     Files           348      53      -295     
     Lines         16117    1930    -14187     
     Branches       1641     230     -1411     
   ============================================
   - Hits           8633     187     -8446     
   + Misses         6785    1730     -5055     
   + Partials        699      13      -686     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudispark | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-60.38%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2434?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [328 more](https://codecov.io/gh/apache/hudi/pull/2434/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org