You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/11 06:11:04 UTC

[GitHub] [hudi] wxplovecc opened a new pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

wxplovecc opened a new pull request #5018:
URL: https://github.com/apache/hudi/pull/5018


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   This pull request avoid deduplicateRecords method in FlinkWriteHelper run out of order
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 closed pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
danny0405 closed pull request #5018:
URL: https://github.com/apache/hudi/pull/5018


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r830818740



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -91,13 +91,14 @@ public static FlinkWriteHelper newInstance() {
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
       List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+    final boolean hasInsert = records.get(0).getCurrentLocation().getInstantTime().equals("I");

Review comment:
       The `keyedRecords` can be made more efficient:
   ```java
   Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
           .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r824461742



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -113,5 +114,10 @@ public static FlinkWriteHelper newInstance() {
       hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
       return hoodieRecord;
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+
+    if (hasInsert) {
+      recordList.get(0).getCurrentLocation().setInstantTime("I");
+    }
+    return recordList;

Review comment:
       In line 114, we already reset the location, so each records list under the same key after reduction should have the same instant time type as before, so why the set is needed ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r830582362



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -91,13 +91,14 @@ public static FlinkWriteHelper newInstance() {
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
       List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+    final boolean hasInsert = records.get(0).getCurrentLocation().getInstantTime().equals("I");

Review comment:
       how about renaming this as `isInsertBucket` and add a comment to explain why we need this.

##########
File path: hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
##########
@@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String indexType) throws Exceptio
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET"})
+  public void testCopyOnWriteBucketIndex(String indexType) throws Exception {
+    int parallelism = 4;
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.INDEX_TYPE, indexType);
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
+    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
+    conf.setBoolean(FlinkOptions.PRE_COMBINE,true);
+    conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
+    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(parallelism);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Read from file source
+    RowType rowType =
+            (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+                    .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+            rowType,
+            InternalTypeInfo.of(rowType),
+            false,
+            true,
+            TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+            .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    DataStream<RowData> dataStream = execEnv
+            // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+            .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+            .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+            .setParallelism(parallelism);
+
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+    Pipelines.clean(conf, pipeline);
+    JobClient client = execEnv.executeAsync(execEnv.getStreamGraph());
+    if (client.getJobStatus().get() != JobStatus.FAILED) {
+      try {
+        TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish

Review comment:
       is this sleep still needed if we test for COW?

##########
File path: hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
##########
@@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String indexType) throws Exceptio
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET"})
+  public void testCopyOnWriteBucketIndex(String indexType) throws Exception {

Review comment:
       can we use this test for the COW table? include state index as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#issuecomment-1064807946


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825",
       "triggerID" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9e437b2c2942ba29945d1d21c7e214e350e4333 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r830581948



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -113,5 +114,10 @@ public static FlinkWriteHelper newInstance() {
       hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
       return hoodieRecord;
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+
+    if (hasInsert) {
+      recordList.get(0).getCurrentLocation().setInstantTime("I");
+    }
+    return recordList;

Review comment:
       I wrote a test in local and found out the order of the list was changed after reduction. <id1, id2> became <id2,id1> somehow, so it's not related to a single record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#issuecomment-1064806485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9e437b2c2942ba29945d1d21c7e214e350e4333 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#issuecomment-1064807946


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825",
       "triggerID" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9e437b2c2942ba29945d1d21c7e214e350e4333 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#issuecomment-1064901069


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825",
       "triggerID" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9e437b2c2942ba29945d1d21c7e214e350e4333 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6825) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#issuecomment-1064806485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b9e437b2c2942ba29945d1d21c7e214e350e4333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9e437b2c2942ba29945d1d21c7e214e350e4333 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #5018: [HUDI-3559] fix flink Bucket Index with COW table type `NoSuchElementException` cause of deduplicateRecords method in FlinkWriteHelper out of order

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r830810428



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -113,5 +114,10 @@ public static FlinkWriteHelper newInstance() {
       hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
       return hoodieRecord;
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+
+    if (hasInsert) {
+      recordList.get(0).getCurrentLocation().setInstantTime("I");
+    }
+    return recordList;

Review comment:
       Yes, the Map::values does not guarantee the sequence, state index based writer has no problem because it assigns the instant "I" and "U" based on the buckets of last checkpoint, and reuse the buckets within one checkpoint.
   
   This fix is necessary for it to be more robust.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org