You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/02 06:48:51 UTC

[GitHub] [iceberg] Stephen-Robin opened a new pull request #2196: Core: Data loss after compaction #2195

Stephen-Robin opened a new pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196


   For details, please refer to https://github.com/apache/iceberg/issues/2195


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772951941


   Thank you for fixing this, @Stephen-Robin! I've tagged this for inclusion in the 0.11.1 patch release. We should probably do that soon since we have a correctness bug.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772653959


   lgtm, @aokolnychyi do you have any comments?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772240207


   > 2. When the split target size is set to such as 256M, rewriting large files after splitting will not significantly increase the load of hdfs.
   
   yes,I thought wrong
   
   > 3. After splitting into small files, it is convenient for  filtering by  file metadata in manifest.
   
   it sounds make sense
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569640921



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }

Review comment:
       done

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       done

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569641455



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;

Review comment:
       I just don’t understand this very well, I need to set splitTargetSize to a value smaller than the largest file

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());

Review comment:
       yeah, I'm wondering if I need to perform deduplication

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from rows sort by c2");

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569588414



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from rows sort by c2");

Review comment:
       I'm not very comfortable with using the same view to load original and rewritten records. We also don't need to create a view or use SQL to do this because the rest of the tests use `ThreeColumnRecord`. Could you just get a DataFrame, sort by c2, convert it to `Dataset<ThreeColumnRecord>`, and collect the results? That seems simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569874354



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());

Review comment:
       Let's fix this one later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772096804


   @aokolnychyi, can you take a look at this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569585399



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }

Review comment:
       Why does this create an appender instead of using `writeRecords`?
   
   I think this could easily find the largest data file from `planFiles` and base the length on that instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin edited a comment on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin edited a comment on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772160296


   > I am sorry, this is a known bug,I had found the bug when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan.
   
   @zhangjun0x01  
   Hi zhangjun, I found that large files exceeding the threshold were filtered out in PR.[#1762](https://github.com/apache/iceberg/pull/1762/files) , Perhapsd the rewrite data file operation should not only includes small file merging, but also large files should be segmented and rewritten. This PR has already rewritten large files after segmentation. What do you think about this, and thanks rdblue for pushing this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772200752


   > @Stephen-Robin Splitting a large file into multiple small files will make the Rewrite Action consume more resources, and too many small files are not friendly to hdfs. what do you think about ?
   
   I think it makes sense to split large files into smaller files. 
   1. the user’s goal is to get the expected file size.
   2. When the split target size is set to such as 256M, rewriting large files after splitting will not significantly increase the load of hdfs.
   3. After splitting into small files, it is convenient for  filtering by  file metadata in manifest.
   4. We can  also add an option to let users decide whether to split large files


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772096804


   @aokolnychyi, can you take a look at this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772186229


   > @zhangjun0x01, next time please highlight that there is a correctness problem. I didn't know that #1762 fixed a correctness problem or we would have prioritized it and made sure it was in the 0.11.0 release. Thanks for pointing us to that issue, we will take a look at both alternative solutions.
   
   I'm very sorry, this problem should be resolved before 0.11.0 release. When I submitted this pr, I requested openinx to help me to review, maybe he be busy, I forgot this pr when 0.11.0  was released, I must pay attention to this problem next time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569730890



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());

Review comment:
       I wonder if this is an error in RewriteDatafile Actions, where it should use a set to collect the deleted files. Like we are getting 1 record of "delete" for each split in the large file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569874827



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +315,56 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList();
+
+    IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i)));
+    Dataset<Row> df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1);
+    writeDF(df);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));

Review comment:
       This looks good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569589820



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());

Review comment:
       What are the 4 data files that this action deletes? Is one of the 3 from above duplicated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772160296


   > I am sorry, this is a known bug,I had found the bug when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan.
   
   @zhangjun0x01  
   Hi zhangju, I found that large files exceeding the threshold were filtered out in PR.[#1762](https://github.com/apache/iceberg/pull/1762/files) , Perhapsd the rewrite data file operation should not only includes small file merging, but also large files should be segmented and rewritten. This PR has already rewritten large files after segmentation. What do you think about this, and thanks rdblue for pushing this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772175903


   @Stephen-Robin I think there is no need to split the large file, because if the file size exceeds the target size, it will be automatically split into multiple `CombinedScanTasks` when reading, and read concurrently, instead of having a task to read the large file. Splitting a large file into multiple small files will make the Rewrite Action consume more resources, and too many small files are not friendly to hdfs. what do you think about ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772146944


   @zhangjun0x01, next time please highlight that there is a correctness problem. I didn't know that #1762 fixed a correctness problem or we would have prioritized it and made sure it was in the 0.11.0 release. Thanks for pointing us to that issue, we will take a look at both alternative solutions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569582496



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;

Review comment:
       It would be easier to understand if this were set up where `file` is used to create `dataFile`. Or, this could use the length of `dataFile` instead so we don't need to make sure `dataFile` and `file` have the same length.

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       Minor: I think we should refer to these in the context as "files" not "tasks" because tasks are usually what we get after splitting and combining.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569603755



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }

Review comment:
       As I noted above, you may need a modified "writeRecords" which uses a single partition if you want to generate 1 file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer edited a comment on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer edited a comment on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772584764


   My quick notes on this issue:
   
   ```
   Previously when computing the rewrite tasks for RewriteDataFiles the code
   would ignore scan tasks which referred to a single file. This is an issue because
   large files could be potentitally split into multiple read tasks. If one
   slice of a large file was combined with a slice from another file, that
   secition would be rewritten with the other file, but the other slices would be ignored.
   
   For example given 2 files
   File A - 100 Bytes
   File B - 10 Bytes
   
   If the target split size was 60 bytes we would end up with 3 tasks
   A : 1 - 60
   A : 61 - 100
   B : 0 - 10
   
   Which would be combined into
   
   (A : 1 - 60)
   (A : 61 -100, B : 0 -10)
   
   The first task would be discarded since it only refered to one file. The
   second task would be rewritten, which would end with deleting file A and B.
   
   I believe the original intent was to ignore single file scan tasks as it was assumed these would
   be unchanged files. But if a single file scan task only contains a partial scan of a file it must 
   be rewritten since it represents a new smaller file that needs to be rewritten.
   
   Normally this doesn't cause data loss since an ignored file won't be deleted, but if a split is
   combined with another file, then that triggers the delete of the large file, even though several
   splits of the large file will not have been written into new files.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772129492


   I am sorry, this is a known bug,I had found the bug  when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569602777



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       Ah yeah I think I was mistaken here, I think we may be generating multiple files via the writeRecords method which could potentially parallelize the write resulting in multiple files. (This is for the 2 record file)
   
   Instead we would need to do a repartition(1) to insure a single file is written.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569589356



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       This appends one file directly (the big one, `dataFile`) and one using `writeRecords`. Why are there 3 files in the table at this point?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569602777



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       Ah yeah I think I was mistaken here, I think we may be generating multiple files via the writeRecords method which could potentially parallelize the write resulting in multiple files. (This is for the 2 record file)
   
   Instead we would need to do a repartition(1) to insure a single file is written.
   
   In my internal test I just did the repartition to make sure we had a single file writes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569641064



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Review comment:
       yes. 1 maxSizeFile + 2 row data file




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569588414



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+        .withPath(file.getAbsolutePath())
+        .withFileSizeInBytes(file.length())
+        .withFormat(FileFormat.PARQUET)
+        .withRecordCount(count)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records1);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("rows");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from rows sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = file.length() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from rows sort by c2");

Review comment:
       I'm not very comfortable with using the same view to load original and rewritten records. Can you create a separate view for the rewritten data? That way we avoid any weird caching behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer edited a comment on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer edited a comment on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772584764


   My quick notes on this issue:
   
   ```
   Previously when computing the rewrite tasks for RewriteDataFiles the code
   would ignore scan tasks which referred to a single file. This is an issue because
   large files could be potentitally split into multiple read tasks. If one
   slice of a large file was combined with a slice from another file, that
   secition would be rewritten with the other file, but the other slices would be ignored.
   
   For example given 2 files
   File A - 100 Bytes
   File B - 10 Bytes
   
   If the target split size was 60 bytes we would end up with 3 tasks
   A : 1 - 60
   A : 61 - 100
   B : 0 - 10
   
   Which would be combined into
   
   (A : 1 - 60)
   (A : 61 -100, B : 0 -10)
   
   The first task would be discarded since it only refered to one file. The
   second task would be rewritten, which would end with deleting file A and B.
   
   I believe the original intent was to ignore single file scan tasks as it was assumed these would
   be unchanged files. But if a single file scan task only contains a partial scan of a file it must 
   be rewritten since it represents a new smaller file that needs to be rewritten.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin edited a comment on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin edited a comment on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772200752


   > @Stephen-Robin Splitting a large file into multiple small files will make the Rewrite Action consume more resources, and too many small files are not friendly to hdfs. what do you think about ?
   
   @zhangjun0x01 
   I think it makes sense to split large files into smaller files. 
   1. the user’s goal is to get the expected file size.
   2. When the split target size is set to such as 256M, rewriting large files after splitting will not significantly increase the load of hdfs.
   3. After splitting into small files, it is convenient for  filtering by  file metadata in manifest.
   4. We can  also add an option to let users decide whether to split large files


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772581755


   I think this PR is pretty much ready to go other than a few nits


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772188549


   You're welcome to subscribe dev@ mailing list and participate discussion, RC verification, etc. Github mention is easy to slip through, so once you find some urgent things like regression or correctness, dev@ mailing list (or Slack channel) is appropriate place to share.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#issuecomment-772584764


   My quick notes on this issue:
   
   ```
   Previously when computing the rewrite tasks for RewriteDataFiles the code
   would ignore scan tasks which referred to a single file. This is an issue because
   large files could be potentitally split into multiple read tasks. If one
   slice of a large file was combined with a slice from another file, that
   secition would be rewritten with the other file, but the other slices would be ignored.
   
   For example given 2 files
   File A - 100 Bytes
   File B - 10 Bytes
   
   If the target split size was 60 bytes we would end up with 3 tasks
   A : 1 - 60
   A : 61 - 100
   B : 0 - 10
   
   Which would be combined into
   
   (A : 1 - 60)
   (A : 61 -100, B : 0 -10)
   
   The first task would be discarded since it only refered to one file. The
   second task would be rewritten, which would end with deleting file A and B.
   
   I believe the original intent was to ignore single file scan tasks as it was assumed these would
   be unchanged files. But if a single file scan task only contains a partial scan of a file it must 
   be rewritten since it represents a new smaller file that needs to be rewritten.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Stephen-Robin commented on a change in pull request #2196: Core: Data loss after compaction #2195

Posted by GitBox <gi...@apache.org>.
Stephen-Robin commented on a change in pull request #2196:
URL: https://github.com/apache/iceberg/pull/2196#discussion_r569606595



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -311,6 +320,76 @@ public void testRewriteLargeTableHasResiduals() {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws IOException, AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<Record> excepted = Lists.newArrayList();
+    Record baseRecord = GenericRecord.create(SCHEMA);
+
+    GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(SCHEMA);
+    int count = 0;
+    File file = temp.newFile();
+    try (FileAppender<Record> fileAppender =
+            genericAppenderFactory.newAppender(Files.localOutput(file), FileFormat.PARQUET)) {
+      int fileSize = 10000;
+      for (; fileAppender.length() < fileSize; count++) {
+        Record record = baseRecord.copy();
+        record.setField("c1", count);
+        record.setField("c2", "foo" + count);
+        record.setField("c3", "bar" + count);
+        fileAppender.add(record);
+        excepted.add(record);
+      }
+    }

Review comment:
       Okay, thank you for your comments, I will make changes immediately




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org