You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/22 07:20:05 UTC

[GitHub] [iceberg] openinx opened a new pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

openinx opened a new pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189


   This PR is trying to fix the flaky testHashDistributeMode unit test fundamentally.  The following are the explannation about the current fix.
   
   1. https://github.com/apache/iceberg/pull/4117#issuecomment-1042701849
   2. https://github.com/apache/iceberg/pull/4117#issuecomment-1042718844
   3. https://github.com/apache/iceberg/issues/2575#issuecomment-1046845868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048484900


   @stevenzwu The root cause is : Previous design could not guarantee that a single checkpoint could commit all rows to a given transaction.  [Here](https://github.com/apache/iceberg/issues/2575#issuecomment-1047492144) is another example.   That's why we are now trying to guarantee this in this PR.
   
   The new description looks good to me if you think it's more clear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#discussion_r811748307



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -253,15 +258,24 @@ public void testHashDistributeMode() throws Exception {
         "write.format.default", format.name(),
         TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
     );
+
+    // Initialize a BoundedSource table to precisely control that those 9 rows will be emitted in only one checkpoint.
+    List<Row> dataSet = ImmutableList.of(
+        Row.of(1, "aaa"), Row.of(1, "bbb"), Row.of(1, "ccc"),
+        Row.of(2, "aaa"), Row.of(2, "bbb"), Row.of(2, "ccc"),
+        Row.of(3, "aaa"), Row.of(3, "bbb"), Row.of(3, "ccc"));
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));

Review comment:
       shall we produce more than one checkpoint? and add enough records in each part instead of enumerating them?

##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -253,15 +258,24 @@ public void testHashDistributeMode() throws Exception {
         "write.format.default", format.name(),
         TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
     );
+
+    // Initialize a BoundedSource table to precisely control that those 9 rows will be emitted in only one checkpoint.
+    List<Row> dataSet = ImmutableList.of(
+        Row.of(1, "aaa"), Row.of(1, "bbb"), Row.of(1, "ccc"),
+        Row.of(2, "aaa"), Row.of(2, "bbb"), Row.of(2, "ccc"),
+        Row.of(3, "aaa"), Row.of(3, "bbb"), Row.of(3, "ccc"));
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" +
+        " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId);
+    Assert.assertEquals("Should have the expected rows", Sets.newHashSet(dataSet),
+        Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
+
     sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
         tableName, toWithClause(tableProps));
 
     try {
       // Insert data set.
-      sql("INSERT INTO %s VALUES " +
-          "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
-          "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
-          "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+      sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
 
       Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
       SimpleDataUtil.assertTableRecords(table, ImmutableList.of(

Review comment:
       check records based on `dataSet`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. I am not sure this change to BoundedTestSource will help. It is not about how many rows in one checkpoint cycle. The real issue is that two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. 
   
   I misunderstood the PR earlier. Looks like the change is to make sure we have one checkpoint cycle for all rows to avoid the problem.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1049189441


   Thanks for fixing the flaky test, @openinx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. As a result, two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. 
   
   I misunderstood the PR earlier. Looks like the change is to make sure we have one checkpoint cycle for all rows to bypass the potential problem from multiple checkpoint cycles.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. I am not sure this change to BoundedTestSource will help. It is not about how many rows in one checkpoint cycle. The real issue is that two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. With that assumption, we can never assert on file count.
   
   The assertion is to verify that rows with the same partition value is only written by a single writer task.  Maybe we can leveraging the naming convention of the data file (with subtaskId part). It is hacky/fragile though.
   
   ```
     private String generateFilename() {
       return format.addExtension(
           String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet()));
     }
   ``` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. As a result, two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. 
   
   I misunderstood the PR earlier. Looks like the change is to make sure we have one checkpoint cycle for all rows to avoid the problem.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048982506


   @openinx looks good. can you merge this? should be safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1047509123


   Run this 20 times in my host, everything seems OK: 
   
   ```
   for i in `seq 1 20`; do
       ./gradlew :iceberg-flink:iceberg-flink-1.14_2.12:test --tests "org.apache.iceberg.flink.TestFlinkTableSink"
       if [ ! $? -eq 0 ] ; then
           exit 1
       fi
   done
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. I am not sure this change to BoundedTestSource will help. It is not about how many rows in one checkpoint cycle. The real issue is that two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. With that assumption, we can never assert on file count.
   
   The assertion is to verify that rows with the same partition value is only written by a single writer task. It is a little hacky/fragile. Maybe we can leveraging the naming convention of the data file (with subtaskId part).
   
   ```
     private String generateFilename() {
       return format.addExtension(
           String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet()));
     }
   ``` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#issuecomment-1048429340


   @openinx I have run the test hundreds of times locally and was never able to reproduce it.
   
   Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. I am not sure this change to BoundedTestSource will help. It is not about how many rows in one checkpoint cycle. The real issue is that two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. With that assumption, we can never assert on file count.
   
   The assertion is to verify that rows with the same partition value is only written by a single writer task. It is a little hacky/fragile. Maybe we can leveraging the naming convention of the data file (with subtaskId part).
   
   ```
     private String generateFilename() {
       return format.addExtension(
           String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet()));
     }
   ``` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4189: Flink 1.14: Fix the flaky testHashDistributeMode by controlling rows in one checkpoint precisely

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4189:
URL: https://github.com/apache/iceberg/pull/4189#discussion_r811852248



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -253,15 +258,24 @@ public void testHashDistributeMode() throws Exception {
         "write.format.default", format.name(),
         TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
     );
+
+    // Initialize a BoundedSource table to precisely control that those 9 rows will be emitted in only one checkpoint.
+    List<Row> dataSet = ImmutableList.of(
+        Row.of(1, "aaa"), Row.of(1, "bbb"), Row.of(1, "ccc"),
+        Row.of(2, "aaa"), Row.of(2, "bbb"), Row.of(2, "ccc"),
+        Row.of(3, "aaa"), Row.of(3, "bbb"), Row.of(3, "ccc"));
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));

Review comment:
       I think a single checkpoint is good enough to validate the PartitionKeySelector. More checkpoints will make the unit test more complex but validate the same thing in my mind. 
   
   Mocking more records as the testing data set looks good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org