You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/30 07:02:21 UTC

[GitHub] [iceberg] zhangjun0x01 opened a new pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

zhangjun0x01 opened a new pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704


   the pr fix #1661 
   
   If DataFile cannot be  combined to CombinedScanTask with other DataFiles, the size of CombinedScanTask list size is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517725367



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());
+
+    if (fileterCombinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = fileterCombinedScanTasks.stream()

Review comment:
       yes thanks for your suggestion,I update it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r533268463



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +290,79 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    if (!format.equals(FileFormat.ORC)) {

Review comment:
       Use `Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522591211



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       Is this change correct  ?   If we have two `file1` (200MB),  `file2` (1MB), and the target file size is 128MB, then it will be planned into three combined tasks:  128MB, 72MB, 1MB.  Finally the `file1` will occur twice in the currentDataFiles  ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r521887518



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       I update the pr,add a test  case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517724068



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       > Maybe more clear as !(task.files.nonempty)
   
   we need to remove the combinedScanTasks which size is 1 ,not `nonempty`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752300870


   > Patch looks good to me, left few minor comments. Thanks.
   
   thanks for your review.I update the pr


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522818592



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       Yeah, i can understand that it's constructing a large string so that we could have a large file,  then we could pack the larger file into a separate `CombinedScanTask` which should not execute the rewrite action. I mean we could just generate a string like that `Strings.repeat(String.valueOf(I), 10)`, don't have to concat and concat again.  It's a minor issue. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517725071



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       > Could also push this into the build of Combined scan tasks on line 228, rather than doing a new variable
   
   I refer to the variable filteredGroupedTasks above, I think it is maybe more clear 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752015048


   Patch looks good to me,  left few minor comments.  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-736620967


   I'll try to take a look at this PR this week as well as I wrote a design [doc](https://docs.google.com/document/d/1aXo1VzuXxSuqcTzMLSQdnivMVgtLExgDWUFMvWeXRxc/edit#heading=h.41vc5dxcib1o) for compaction.
   
   SQL extensions are almost done so I should have more time now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752908351


   CI is OK now,  just got this merged. Thanks @zhangjun0x01  for contributing. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522592084



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       if there is a datafile which size > targetSizeInBytes ,I think it should not be rewrited , I open a new PR to deal this . 
   https://github.com/apache/iceberg/pull/1762




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517611911



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       Could also push this into the build of Combined scan tasks on line 228, rather than doing a new variable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752524823


   Seems the unit test is still broken ?  @zhangjun0x01 would you mind to take a look ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r533279986



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +290,79 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    if (!format.equals(FileFormat.ORC)) {
+      List<Record> expected = Lists.newArrayList();
+      Schema schema = icebergTableUnPartitioned.schema();
+      GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
+      File file = temp.newFile();
+      FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format);
+      long filesize = 20000;
+      int count = 0;
+      for (; fileAppender.length() < filesize; count++) {
+        Record record = RECORD.copy();
+        record.setField("id", count);
+        record.setField("data", "iceberg");
+        fileAppender.add(record);
+        expected.add(record);
+      }
+      fileAppender.close();
+
+      DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
+          .withPath(file.getAbsolutePath())
+          .withFileSizeInBytes(file.length())
+          .withFormat(format)
+          .withRecordCount(count)
+          .build();
+
+      icebergTableUnPartitioned.newAppend()
+          .appendFile(dataFile)
+          .commit();
+
+      sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
+      sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
+
+      icebergTableUnPartitioned.refresh();
+
+      CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+      List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+      Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+
+      Actions actions = Actions.forTable(icebergTableUnPartitioned);
+
+      long targetSizeInBytes = file.length() + 10;
+      RewriteDataFilesActionResult result = actions
+          .rewriteDataFiles()
+          .targetSizeInBytes(targetSizeInBytes)
+          .splitOpenFileCost(1)
+          .execute();
+      Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+      Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+      icebergTableUnPartitioned.refresh();
+
+      CloseableIterable<FileScanTask> tasks1 = icebergTableUnPartitioned.newScan().planFiles();
+      List<DataFile> dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+      Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles1.size());

Review comment:
       I think we'd better to check that the compaction did not compaction the biggest file with one of the two small files ?     That means the `dataFiles1`  should have included the `dataFile` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752565531


   in my github acount, the ci passed  ([link](https://github.com/zhangjun0x01/iceberg/actions/runs/451845876/workflow)), and I tested  it in my IDE , it is also passed,I have encountered a similar problem before,I guess it may be related to the CI environment


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522591211



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       Is this change correct  ?   If we have a single file1 with 200MB, and the target file size is 128MB, then it will be planned into two combined tasks:  128MB, 72MB. Finally the `file1` will occur twice in the currentDataFiles  ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r523890317



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       > 2. create a larger file by using FileAppender ( write few records until the file length exceed the given target file size).
   
   
   I took a look and thought it might not be easy to implement. We need to get the file length through the `length` method until the file reaches the target file size, and then close the appender.
   
   But for orc, we cannot get length from an open appending file, we can only get the file length when the file is closed, which is just the opposite of our needs.
   
     org.apache.iceberg.orc.OrcFileAppender#length method
   
   ```
     @Override
     public long length() {
       Preconditions.checkState(isClosed,
           "Cannot return length while appending to an open file.");
       return file.toInputFile().getLength();
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517611651



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       Maybe more clear as !(task.files.nonempty)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522822988



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       About the test case, I think it may be better to generate three data files by using `GenericAppenderFactory` : 
   1.  create two small files, each have on record ;
   2.  create a larger file by using FileAppender ( write few records until the file length exceed the given target file size). 
   
   Then in theory,  we don't have to  set different threshold for different file format [here](https://github.com/apache/iceberg/pull/1704/files#diff-4a4c7e7c4b4e81c01ecdd9fb53fad72d2b87073ac74ccfeaec0af68ec98816d0R342), The current code is hard to maintain because it seems tricky.
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r549636084



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +292,82 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
+    List<Record> expected = Lists.newArrayList();
+    Schema schema = icebergTableUnPartitioned.schema();
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
+    File file = temp.newFile();
+    int count = 0;
+    try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
+      long filesize = 20000;
+      for (; fileAppender.length() < filesize; count++) {
+        Record record = RECORD.copy();
+        record.setField("id", count);
+        record.setField("data", "iceberg");

Review comment:
       nit: we could just use the the: 
   
   ```java
   Record record = SimpleDataUtil.createRecord(count, "iceberg");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r533263788



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       I think we should not rely on specific file-selection strategies here, because different selection strategies  may result in duplicated files for `currentDataFiles`.  The correct way is:  remove the duplicated files so that the `currentDataFiles` has correct files.
   
   I saw the [here](https://github.com/apache/iceberg/pull/1704/files#diff-a330da6f6014d3154bb0979281e53f2b69c9e32117d0223bdf94d514a05cf559R264) has used the `set` to remove duplicated files.  So it's OK now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-724396196


   @openinx could you help me review the pr ? thanks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752352584


   Let's trigger the CI again. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517610580



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =

Review comment:
       Typo in variable name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517725071



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       > Could also push this into the build of Combined scan tasks on line 228, rather than doing a new variable
   
   I refer to the variable filteredGroupedTasks above,maybe it is more clear




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-736620967


   I'll try to take a look at this PR this week as well as I wrote a design [doc](https://docs.google.com/document/d/1aXo1VzuXxSuqcTzMLSQdnivMVgtLExgDWUFMvWeXRxc/edit#heading=h.41vc5dxcib1o) for compaction.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522861390



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       Yes, we should generate a file that meets the target size for different formats. I did not find a suitable method to generate a file and meet my expectations at  that time. I will take a look `GenericAppenderFactory`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r549637952



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +292,82 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
+    List<Record> expected = Lists.newArrayList();
+    Schema schema = icebergTableUnPartitioned.schema();
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
+    File file = temp.newFile();
+    int count = 0;
+    try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
+      long filesize = 20000;
+      for (; fileAppender.length() < filesize; count++) {
+        Record record = RECORD.copy();
+        record.setField("id", count);
+        record.setField("data", "iceberg");
+        fileAppender.add(record);
+        expected.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(format)
+        .withRecordCount(count)
+        .build();
+
+    icebergTableUnPartitioned.newAppend()
+        .appendFile(dataFile)
+        .commit();
+
+    sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+
+    Actions actions = Actions.forTable(icebergTableUnPartitioned);
+
+    long targetSizeInBytes = file.length() + 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFilesRewrited = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));

Review comment:
       nit:  `Rewrited` -> `Rewrote`,  or we could just name it `newDataFiles`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r549638941



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +292,82 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
+    List<Record> expected = Lists.newArrayList();
+    Schema schema = icebergTableUnPartitioned.schema();
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
+    File file = temp.newFile();
+    int count = 0;
+    try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
+      long filesize = 20000;
+      for (; fileAppender.length() < filesize; count++) {
+        Record record = RECORD.copy();
+        record.setField("id", count);
+        record.setField("data", "iceberg");
+        fileAppender.add(record);
+        expected.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(format)
+        .withRecordCount(count)
+        .build();
+
+    icebergTableUnPartitioned.newAppend()
+        .appendFile(dataFile)
+        .commit();
+
+    sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+
+    Actions actions = Actions.forTable(icebergTableUnPartitioned);
+
+    long targetSizeInBytes = file.length() + 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFilesRewrited = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFilesRewrited.size());
+
+    // the biggest file do not be rewrited
+    List rewritedDataFileNames = dataFilesRewrited.stream().map(df -> df.path()).collect(Collectors.toList());

Review comment:
       nit:  could use the method references here.  
   
   ```java
           // The biggest file do not be rewrote.
       List<CharSequence> newPaths = newDataFiles.stream().map(ContentFile::path).collect(Collectors.toList());
       Assert.assertTrue(newPaths.contains(file.getAbsolutePath()));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r521782358



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());

Review comment:
       I agree with @RussellSpitzer 's comment about "push this into the build of Combined scan tasks on line 228", because in that way we don't have to construct the `List<CombinedScanTask>` twice,  that is unnecessary. 
   
   BTW,  would you pls add few unit tests to address this case ?  I think it's great to improve this but better to fix it with a UT.
   
   Thanks for the work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r533277424



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +290,79 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
+   * cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    if (!format.equals(FileFormat.ORC)) {
+      List<Record> expected = Lists.newArrayList();
+      Schema schema = icebergTableUnPartitioned.schema();
+      GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
+      File file = temp.newFile();
+      FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format);
+      long filesize = 20000;
+      int count = 0;
+      for (; fileAppender.length() < filesize; count++) {
+        Record record = RECORD.copy();
+        record.setField("id", count);
+        record.setField("data", "iceberg");
+        fileAppender.add(record);
+        expected.add(record);
+      }
+      fileAppender.close();
+
+      DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
+          .withPath(file.getAbsolutePath())
+          .withFileSizeInBytes(file.length())
+          .withFormat(format)
+          .withRecordCount(count)
+          .build();

Review comment:
       Use the `try(...){}` to close the fileAppender like this: 
   
   ```java
         File file = temp.newFile();
   
         int fileSize = 2000;
         try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
           for (int idx = 0; fileAppender.length() < fileSize; idx++) {
             Record record = RECORD.copy();
             record.setField("id", idx);
             record.setField("data", "iceberg");
             fileAppender.add(record);
             expected.add(record);
           }
         }
   
         DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
             .withPath(file.getAbsolutePath())
             .withFileSizeInBytes(file.length())
             .withFormat(format)
             .withRecordCount(expected.size())
             .build();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522811571



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       It's right that we may don't have to rewrite those files whose size greater than target file size.  But in this patch, I don't understand why we need to change it , could you revert this pls ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r524838016



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       I update the pr,use `GenericAppenderFactory` to generate the datafile except `ORC` format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522592834



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       nit:  do we need to concat such a complex string for the `data`  ?  It doesn't look elegant.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517613310



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());
+
+    if (fileterCombinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = fileterCombinedScanTasks.stream()

Review comment:
       I think the flatmap here is unecessary since we can flatMap within the map operation instead,
   
   ```
       filteredCombinedScanTasks.stream()
           .flatMap(task -> task.files().stream().map(FileScanTask::file))
           .collect(Collectors.toList())
   ```
   
   This should skip at least one List allocation :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r517613310



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -228,9 +228,18 @@ public RewriteDataFilesActionResult execute() {
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
+    // add a filter  to the CombinedScanTask list to avoid repeated rewrite datafile
+    List<CombinedScanTask> fileterCombinedScanTasks =
+        combinedScanTasks.stream().filter(task -> task.files().size() > 1).collect(Collectors.toList());
+
+    if (fileterCombinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = fileterCombinedScanTasks.stream()

Review comment:
       I think the flatmap here is unecessary since we can flatMap within the map operation instead,
   
   ```
       filteredCombinedScanTasks.stream()
           .flatMap(task -> task.files().stream().map(FileScanTask::file))
           .collect(Collectors.toList())
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-752314571


   the ci test failed ,I check it ,  `org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete` throw an exception, the PR is unrelated with spark ,is it a bug for CI ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522594622



##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,4 +280,91 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  /**
+   * a test case to test avoid repeate compress
+   * <p>
+   * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
+   * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
+   * <p>
+   * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that
+   * it cannot be  combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
+   * compressed.
+   * <p>
+   * For the same data, the file sizes of different formats are different. The file sizes of different formats generated
+   * by the data in the current test case are as follows:
+   * <p>
+   *   avro :
+   *  size  file
+   *  408 00000-0-5a218337-1742-4ed1-83d8-55e301da49b8-00001.avro
+   * 2390 00000-0-8f431924-ec8d-4957-a238-b8fe2b136210-00001.avro
+   *  408 00000-0-9c75bcc4-49f0-4722-9528-c1d5faa50fa7-00001.avro
+   *
+   * orc :
+   * size  file
+   * 1626 00000-0-260d42d1-f00f-4c5f-9628-5f41f6395093-00001.orc
+   *  331 00000-0-942fd38b-d7af-4ad2-a985-0e6ccdb4d8d3-00001.orc
+   *  333 00000-0-ad8f2c34-6cf7-43fe-990f-f8f6389d198e-00001.orc
+   *
+   * parquet :
+   * size  file
+   *  611 00000-0-84e1fd63-a840-4a23-983f-5247e9218cbe-00001.parquet
+   *  611 00000-0-91b070f0-7d17-487c-97ec-de0f0b09aa31-00001.parquet
+   * 2691 00000-0-e09c969d-d6ee-4a41-9e42-9dcbf42bc4e1-00001.parquet
+   *
+   * @throws IOException IOException
+   */
+  @Test
+  public void testRewriteAvoidRepeateCompress() throws IOException {
+    List<String> records = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 500; i++) {
+      String data = String.valueOf(i) + "hello iceberg,hello flink";
+      records.add("(" + i + ",'" + data + "')");

Review comment:
       This is to construct a file with a larger file size to distinguish the other two small files, so that we can compress the other two small files without compressing the large file. If the three files are similar in size, we will not be able to do the rewrite 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1704: Flink : Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#issuecomment-751894495


   @aokolnychyi, @openinx, this seems like a good one to get into the next release. Can you both take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1704: Fix Repeated Rewrite for RewriteDataFilesAction

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1704:
URL: https://github.com/apache/iceberg/pull/1704#discussion_r522857330



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -226,11 +226,16 @@ public RewriteDataFilesActionResult execute() {
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
+        .filter(task -> task.files().size() > 1)
         .collect(Collectors.toList());
 
+    if (combinedScanTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
     List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+    List<DataFile> currentDataFiles = combinedScanTasks.stream()

Review comment:
       `filteredGroupedTasks` contains all the datafiles that meet the conditions of the query. `combinedScanTasks` removes some datafiles that are smaller than targetSizeInBytes but do not need to be compressed. If when constructing `currentDataFiles`,we use filteredGroupedTasks  instead of combinedScanTasks, those large files that do not need to be compressed will be deleted, resulting in results Incorrect.
   
   When I doing this PR, I forgot to consider the datefile  whose size greater than the target file size. I think it is another issue, so I opened a new PR #1762 to fix it when  I found this issue. I think we should modify #1762 first, then modify this PR, or merge #1762 into  this PR ? what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org